Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
traci.vehicle.changeLane(vehID, laneIndex, traciEndTime)
# check if vehID has arrived at destination
arrivedList = traci.simulation.getArrivedIDList()
if vehID in arrivedList:
print("[%03d] Vehicle '%s' has arrived at destination" % (step, vehID))
break
print("[%03d] lane %d, lateral pos: %.2f" %
(step, traci.vehicle.getLaneIndex(vehID), traci.vehicle.getLateralLanePosition(vehID)))
sys.stdout.flush()
step += 1
print("Print ended at step %s" % traci.simulation.getTime())
traci.close()
sys.stdout.flush()
# this is the normal way of using traci. sumo is started as a
# subprocess and then the python script connects and runs
traci.start([sumoBinary, "-n", "input_net.net.xml", "-r", "input_routes.rou.xml",
"--no-step-log", "true",
"--default.speeddev", "0"])
# Wait until the vehicle enters
while ToC_vehicle not in traci.vehicle.getIDList():
traci.simulationStep()
printToCParams(ToC_vehicle, extra=True)
run()
traci.close()
sys.stdout.flush()
# print(traci.vehicle.getDecel("veh2") ,"== 2.0")
# print(traci.vehicle.getAccel("veh2") ,"== 3.0")
print(traci.vehicle.getLength("veh2"), "== 4.0")
# print(traci.vehicle.getApparentDecel("veh2") , "== 6.0")
# print(traci.vehicle.getEmergencyDecel("veh2") , "== 7.0")
# print(traci.vehicle.getEmissionClass("veh2") , " == ??")
print(traci.vehicle.getMinGap("veh2"), "== 0.8")
print(traci.vehicle.getSpeedFactor("veh2"), "== 0.9")
print(traci.vehicle.getWidth("veh2"), "== 3.3")
# print(traci.vehicle.getTau("veh2") , "== 1.3")
print(traci.vehicle.getMaxSpeed("veh2"), "== 99.9")
print(traci.vehicle.getActionStepLength("veh2"), "==1.5")
while traci.simulation.getTime() < tend:
traci.simulationStep()
traci.close()
remaining = traci.person.getRemainingStages("p3")
assert(remaining == 1)
# replace current stage
print_remaining_plan("p3", "(before replacement of current stage")
traci.person.replaceStage("p3", 0, stage2)
print_remaining_plan("p3", "(after replacement")
# replace later stage
traci.person.appendStage("p3", stage3)
print_remaining_plan("p3", "(before replacement of next stage")
traci.person.replaceStage("p3", 1, stage4)
print_remaining_plan("p3", "(after replacement")
for i in range(41):
traci.simulationStep()
traci.close()
step = 1
nrEnteredVehicles = 0
sumoStop = False
try:
traciEndStep = math.ceil(traciEndTime / steplength)
while not step > traciEndStep:
traci.simulationStep(step * steplength)
# print(index, "asking for vehicles")
# sys.stdout.flush()
traci.vehicle.getIDList()
nrEnteredVehicles += traci.simulation.getDepartedNumber()
# ~ print(index, "Newly entered vehicles: ", traci.simulation.getDepartedNumber(), "(vehs: ", vehs, ")")
# ~ sys.stdout.flush()
step += 1
endTime = traci.simulation.getTime()
traci.close()
except traci.FatalTraCIError as e:
if str(e) == "connection closed by SUMO":
time.sleep(orderTime * index) # assure ordering of outputs
sumoStop = True
print("client %s: " % index, str(e), " (at TraCIStep %s)" % step)
sys.stdout.flush()
else:
raise
if not sumoStop:
time.sleep(orderTime * index) # assure ordering of outputs
print("Process %s ended at step %s" % (index, endTime))
print("Process %s was informed about %s entered vehicles" % (index, nrEnteredVehicles))
sys.stdout.flush()
def reset(self):
"""
Resets the state of the environment, returning an initial observation.
Outputs
-------
observation : the initial observation of the space. (Initial reward is assumed to be 0.)
"""
# self state is velocity, observation is velocity
# self._state = np.random.uniform(0, self.GOAL_VELOCITY, size=(self.num_cars,))
# print("In reset function", self.initialized)
if self.initialized:
traci.close()
sumoProcess = subprocess.Popen([self.sumoBinary, "-c", self.cfgfn, "--remote-port", str(self.PORT)], stdout=sys.stdout, stderr=sys.stderr)
traci.init(self.PORT)
traci.simulationStep()
if not self.initialized:
self.vehIDs = traci.vehicle.getIDList()
print("ID List in reset", self.vehIDs)
self.initialized = True
self._state = np.array([traci.vehicle.getSpeed(vID) for vID in self.controllable])
observation = np.copy(self._state)
return observation
def disconnect(self):
self.isConnected = False
traci.close()
sys.stdout.flush()
def terminate(self):
super(SimpleVelocityEnvironment, self).terminate()
traci.close()
def cleanup_sumo_simulation(simulation_task):
global last_lights, last_vehicles
if simulation_task:
if simulation_task.cancel():
simulation_task = None
last_vehicles = {}
last_lights = {}
traci.close()
#
set_sumo_inputs()
#
open_tlc(step)
# open_tlc(step, amber_state)
#
set_state()
#
step += 1
#
traci.close()
sys.stdout.flush()