Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def runSingle(traciEndTime, viewRange, objID):
step = 0
traci.start(sumoCall + ["-c", "sumo.sumocfg"])
subscribed = False
while not step > traciEndTime:
traci.simulationStep()
if subscribed:
# check if objID has arrived at destination
arrivedList = traci.simulation.getArrivedIDList()
if objID in arrivedList:
print("[%03d] Vehicle '%s' has arrived at destination" % (step, objID))
break
print("[%03d] Context results for vehicle '%s':" % (step, objID))
for v in sorted(traci.vehicle.getContextSubscriptionResults(objID) or []):
print(v)
if not subscribed:
print("Subscribing to vehicle context of object '%s'" % (objID))
traci.vehicle.subscribeContext(objID, traci.constants.CMD_GET_VEHICLE_VARIABLE,
viewRange, [traci.constants.VAR_POSITION])
sys.stdout.flush()
traci.vehicle.addSubscriptionFilterLCManeuver(1)
'-r', 'input_routes.rou.xml',
'--no-step-log',
# '-S', '-Q'
])
vehID = 'v0'
traci.simulationStep()
while traci.simulation.getTime() < 75:
pos = traci.vehicle.getPosition3D(vehID)
pos2 = (pos[0] + 20, pos[1])
traci.vehicle.moveToXY('v0', '', 0, pos2[0], pos2[1])
traci.simulationStep()
pos3 = traci.vehicle.getPosition3D(vehID)
print("step=%s old=%s target=%s mapped=%s" % (
traci.simulation.getTime(), pos, pos2, pos3))
traci.close()
from __future__ import print_function
from __future__ import absolute_import
import os
import sys
SUMO_HOME = os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "..", "..")
sys.path.append(os.path.join(os.environ.get("SUMO_HOME", SUMO_HOME), "tools"))
import traci # noqa
import sumolib # noqa
traci.start([sumolib.checkBinary('sumo'), "-c", "sumo.sumocfg"])
vehID = "ego"
traci.simulationStep()
traci.vehicle.rerouteTraveltime(vehID)
while traci.simulation.getMinExpectedNumber() > 0:
traci.simulationStep()
traci.close()
PORT = traci.getFreeSocketPort()
sumoBinary = os.environ.get("SUMO_BINARY", os.path.join(
os.path.dirname(sys.argv[0]), '..', '..', "..", '..', 'bin', 'sumo'))
traci.start([sumoBinary] + saveParams)
tend = 55.
traci.simulationStep()
traci.vehicletype.setImperfection("DEFAULT_VEHTYPE", 1.0)
traci.vehicletype.setImperfection("t1", 1.0)
traci.vehicletype.setDecel("t1", 2.0)
traci.vehicletype.setAccel("t1", 3.0)
traci.vehicletype.setApparentDecel("t1", 6.0)
traci.vehicletype.setEmergencyDecel("t1", 7.0)
# traci.vehicletype.setEmissionClass("t1", ??)
traci.vehicletype.setTau("t1", 1.3)
while traci.simulation.getTime() < tend - 10.:
traci.simulationStep()
traci.vehicle.setImperfection("veh2", 1.0)
traci.vehicle.setDecel("veh2", 2.0)
traci.vehicle.setAccel("veh2", 3.0)
traci.vehicle.setApparentDecel("veh2", 6.0)
traci.vehicle.setEmergencyDecel("veh2", 7.0)
# traci.vehicle.setEmissionClass("veh2", ??)
traci.vehicle.setTau("veh2", 1.3)
print("Get before save....")
print("vtype")
print(traci.vehicletype.getImperfection("DEFAULT_VEHTYPE"), "== 1.0 (imperfection)")
print(traci.vehicletype.getImperfection("t1"), "== 1.0 (imperfection)")
print(traci.vehicletype.getDecel("t1"), "== 2.0 (decel)")
from __future__ import print_function
from __future__ import absolute_import
import os
import sys
sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))
import traci # noqa
import sumolib # noqa
traci.start([sumolib.checkBinary("sumo"), '-c', 'sumo.sumocfg'])
for i in range(30):
print("step=%s stopWaitingList=%s" % (
traci.simulation.getTime(),
traci.simulation.getBusStopWaitingIDList("busStop1")))
traci.simulationStep()
traci.close()
def run():
"""execute the TraCI control loop"""
step = 0
while step < 500:
traci.simulationStep()
if step == 200:
requestToC(ToC_vehicle, timeTillMRM)
t = traci.simulation.getTime()
print("Requested ToC of veh0 at t=%s (until t=%s)" % (t, t + timeTillMRM))
printToCParams(ToC_vehicle, True)
step += 1
traci.trafficlight.setPhase("0", 0)
traci.trafficlight.setPhase("10", 0)
traci.trafficlight.setPhase("01", 0)
traci.trafficlight.setPhase("11", 0)
nA = 2
target_estimator_model_left.set_weights(q_estimator_model_left.get_weights())
left_replay_memory = []
for _ in range(replay_memory_init_size):
if traci.simulation.getMinExpectedNumber() <= 0:
generate_routefile(100, 0)
traci.load(["--start", "-c", "data/cross_multi.sumocfg",
"--tripinfo-output", "tripinfo.xml"])
leftState, rightState, upperLeftState, upperRightState = getStates(transition_time)
leftAction = np.random.choice(np.arange(nA))
rightAction = np.random.choice(np.arange(nA))
upperLeftAction = np.random.choice(np.arange(nA))
upperRightAction = np.random.choice(np.arange(nA))
newLeftState, newRightState, newUpperLeftState, newUpperRightState = makeMoves(leftAction, rightAction, upperLeftAction, upperRightAction,transition_time)
leftReward = getReward(leftState, newLeftState)
rightReward = getReward(rightState, newRightState)
upperLeftReward = getReward(upperLeftState, newUpperLeftState)
upperRightReward = getReward(upperRightState, newUpperRightState)
left_replay_memory.append([leftState, leftAction, leftReward, newLeftState])
left_replay_memory.append([rightState, rightAction, rightReward, newRightState])
left_replay_memory.append([upperLeftState, upperLeftAction, upperLeftReward, newUpperLeftState])
traci.trafficlight.setPhase("0", 0)
nA = 2 ###
state = getState()
experience = []
for i in range(num_history):
experience.append(state)
for episode in range(num_episode):
traci.trafficlight.setPhase("0", 0)
print("INITIAL EXPERIENCE : ")
print(experience)
counter = 0
stride = 0
while traci.simulation.getMinExpectedNumber() > 0:
print("Episode # ", episode)
# print("Waiting time on lane 1i_0 = ",getWaitingTime("1i_0"))
print("Inside episode counter", counter)
print(experience)
counter += 1
# batch_experience = experience[:batch_history]
q_val = model.predict((np.array(experience)).reshape((1, num_history, 5)))
print(q_val)
# if random.random() < epsilon:
# phase = np.random.choice(4)
# print("random action chosen",phase)
# else:
# phase = np.argmax(q_val)
# print("else action",phase)
epsilon = 1.0 / (episode + 1)
def simulate_next_step():
global last_lights, last_vehicles
start_secs = time.time()
traci.simulationStep()
end_sim_secs = time.time()
# Update Vehicles
for veh_id in traci.simulation.getDepartedIDList():
# SUMO will not resubscribe to vehicles that are already subscribed, so this is safe.
traci.vehicle.subscribe(veh_id, TRACI_VEHICLE_CONSTANTS)
# acquire the relevant vehicle information
ids = tuple(set(traci.vehicle.getIDList() +
traci.simulation.getSubscriptionResults()
[tc.VAR_DEPARTED_VEHICLES_IDS]))
vehicles = {veh_id: vehicle_to_dict(traci.vehicle.getSubscriptionResults(veh_id))
for veh_id in ids}
# Vehicles are automatically unsubscribed upon arrival
# and deleted from vehicle list on next
# timestep. Persons are also automatically unsubscribed.
# See: http://sumo.dlr.de/wiki/TraCI/Object_Variable_Subscription).
# Update persons
# Workaround for people: traci does not return person objects in the getDepartedIDList() call
# See: http://sumo.dlr.de/trac.wsgi/ticket/3477
for ped_id in traci.person.getIDList():
traci.person.subscribe(ped_id, TRACI_PERSON_CONSTANTS)
person_ids = traci.person.getIDList()
persons = {p_id: person_to_dict(traci.person.getSubscriptionResults(p_id))