Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# this script has been called from the command line. It will start sumo as a
# server, then connect and run
if options.nogui:
sumoBinary = checkBinary('sumo')
else:
sumoBinary = checkBinary('sumo-gui')
# this is the normal way of using traci. sumo is started as a
# subprocess and then the python script connects and runs
traci.start([sumoBinary, "-n", "input_net.net.xml", "-r", "input_routes.rou.xml", "-a",
"input_additional.add.xml", "--fcd-output", "fcd.xml",
"--no-step-log", "true",
"--default.speeddev", "0"])
# Wait until the vehicle enters
while ToC_vehicle not in traci.vehicle.getIDList():
traci.simulationStep()
printToCParams(ToC_vehicle)
run()
traci.close()
"posLat=%s posLat2=%s") % (mode, x, x2, y, y2, exLane, lane2, exPos, pos2, exPosLat, posLat2))
else:
# (comment, "success")
pass
traci.start([sumolib.checkBinary("sumo"), '-n', 'input_net.net.xml', '--no-step-log'])
traci.simulationStep()
traci.route.add("beg", ["beg"])
traci.vehicle.add(vehID, "beg")
check(1, 40, 1, 0, "beg_1", 40, 0.0, "left lane")
check(1, 40, -1.5, 0, "beg_1", 40, 0.0, "shifted to left lane (permissions)")
check(5, 40, -1.6, 0, "beg_0", 40, 0.0, "right lane (ignore permissions)")
check(2, 40, -1.5, 0, "beg_1", 40, 0.0, "shifted to left lane (permissions)")
check(6, 40, -1.6, 0, "beg_0", 40, 0.0, "right lane (ignore permissions)")
print("vehicleList", traci.vehicle.getIDList())
traci.close()
print("[%03d] Vehicle '%s' has arrived at destination" % (step, objID))
break
print("[%03d] Context results for vehicle '%s':" % (step, objID))
for v in sorted(traci.vehicle.getContextSubscriptionResults(objID) or []):
print(v)
if not subscribed:
print("Subscribing to vehicle context of object '%s'" % (objID))
traci.vehicle.subscribeContext(objID, traci.constants.CMD_GET_VEHICLE_VARIABLE,
viewRange, [traci.constants.VAR_POSITION])
sys.stdout.flush()
traci.vehicle.addSubscriptionFilterLCManeuver(1)
# advice all vehicle not to change lanes
for vehID in traci.vehicle.getIDList():
traci.vehicle.changeLane(vehID, traci.vehicle.getLaneIndex(vehID), 111)
subscribed = True
step += 1
print("Print ended at step %s" % step)
traci.close()
sys.stdout.flush()
# this script has been called from the command line. It will start sumo as a
# server, then connect and run
if options.nogui:
sumoBinary = checkBinary('sumo')
else:
sumoBinary = checkBinary('sumo-gui')
# this is the normal way of using traci. sumo is started as a
# subprocess and then the python script connects and runs
traci.start([sumoBinary, "-n", "input_net.net.xml", "-r", "input_routes.rou.xml", "-a",
"input_additional.add.xml", "--fcd-output", "fcd.xml",
"--no-step-log", "true",
"--default.speeddev", "0"])
# Wait until the vehicle enters
while ToC_vehicle not in traci.vehicle.getIDList():
traci.simulationStep()
printToCParams(ToC_vehicle)
run()
traci.close()
sys.stdout.flush()
if __name__ == "__main__":
options = get_options()
# this script has been called from the command line. It will start sumo as a
# server, then connect and run
if options.nogui:
sumoBinary = checkBinary('sumo')
else:
sumoBinary = checkBinary('sumo-gui')
# this is the normal way of using traci. sumo is started as a
# subprocess and then the python script connects and runs
traci.start([sumoBinary, "-n", "input_net.net.xml", "-r", "input_routes.rou.xml", "--no-step-log", "true"])
# Wait until the vehicle enters
while ToC_vehicle not in traci.vehicle.getIDList():
traci.simulationStep()
printToCParams(ToC_vehicle)
traci.close()
# this script has been called from the command line. It will start sumo as a
# server, then connect and run
if options.nogui:
sumoBinary = checkBinary('sumo')
else:
sumoBinary = checkBinary('sumo-gui')
# this is the normal way of using traci. sumo is started as a
# subprocess and then the python script connects and runs
traci.start([sumoBinary, "-n", "input_net.net.xml", "-r", "input_routes.rou.xml", "-a",
"input_additional.add.xml", "--fcd-output", "fcd.xml",
"--no-step-log", "true",
"--default.speeddev", "0"])
# Wait until the vehicle enters
while ToC_vehicle not in traci.vehicle.getIDList():
traci.simulationStep()
printToCParams(ToC_vehicle)
run()
traci.close()
sys.stdout.flush()
def reset(self):
"""
Resets the state of the environment, returning an initial observation.
Outputs
-------
observation : the initial observation of the space. (Initial reward is assumed to be 0.)
"""
# self state is velocity, observation is velocity
if self.initialized:
traci.close()
sumoProcess = subprocess.Popen([self.sumoBinary, "-c", self.cfgfn, "--remote-port", str(self.PORT)], stdout=sys.stdout, stderr=sys.stderr)
traci.init(self.PORT)
traci.simulationStep()
if not self.initialized:
self.vehIDs = traci.vehicle.getIDList()
print("ID List in reset", self.vehIDs)
self.initialized = True
self._state = np.array([traci.vehicle.getSpeed(vID) for vID in self.controllable])
observation = np.copy(self._state)
return observation
newState = []
# transition_time_step_leftcount = 0
# transition_time_step_rightcount = 0
# transition_time_step_topcount = 0
# transition_time_step_bottomcount = 0
for _ in range(transition_time):
traci.simulationStep()
leftcount = 0
rightcount = 0
topcount = 0
bottomcount = 0
vehicleList = traci.vehicle.getIDList()
print("Traffic : ")
for id in vehicleList:
x, y = traci.vehicle.getPosition(id)
if x < 110 and x > 60 and y < 130 and y > 120:
leftcount += 1
else:
if x < 120 and x > 110 and y < 110 and y > 60:
bottomcount += 1
else:
if x < 180 and x > 130 and y < 120 and y > 110:
rightcount += 1
else:
if x < 130 and x > 120 and y < 180 and y > 130:
def getQueueLength():
leftcount = 0
rightcount = 0
topcount = 0
bottomcount = 0
vehicleList = traci.vehicle.getIDList()
print("Traffic : ")
for id in vehicleList:
x, y = traci.vehicle.getPosition(id)
if x < 110 and x > 60 and y < 130 and y > 120:
leftcount += 1
else:
if x < 120 and x > 110 and y < 110 and y > 600:
bottomcount += 1
else:
if x < 180 and x > 130 and y < 120 and y > 110:
rightcount += 1
else:
if x < 130 and x > 120 and y < 180 and y > 130:
def simulate_next_step():
global last_lights, last_vehicles
start_secs = time.time()
traci.simulationStep()
end_sim_secs = time.time()
# Update Vehicles
for veh_id in traci.simulation.getDepartedIDList():
# SUMO will not resubscribe to vehicles that are already subscribed, so this is safe.
traci.vehicle.subscribe(veh_id, TRACI_VEHICLE_CONSTANTS)
# acquire the relevant vehicle information
ids = tuple(set(traci.vehicle.getIDList() +
traci.simulation.getSubscriptionResults()
[tc.VAR_DEPARTED_VEHICLES_IDS]))
vehicles = {veh_id: vehicle_to_dict(traci.vehicle.getSubscriptionResults(veh_id))
for veh_id in ids}
# Vehicles are automatically unsubscribed upon arrival
# and deleted from vehicle list on next
# timestep. Persons are also automatically unsubscribed.
# See: http://sumo.dlr.de/wiki/TraCI/Object_Variable_Subscription).
# Update persons
# Workaround for people: traci does not return person objects in the getDepartedIDList() call
# See: http://sumo.dlr.de/trac.wsgi/ticket/3477
for ped_id in traci.person.getIDList():
traci.person.subscribe(ped_id, TRACI_PERSON_CONSTANTS)
person_ids = traci.person.getIDList()