Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
fdi = open("sumo.sumocfg")
fdo = open("used.sumocfg", "w")
fdo.write(fdi.read() % {"end": sumoEndTime})
fdi.close()
fdo.close()
step = 0
sumoProcess = subprocess.Popen(
"%s -c used.sumocfg -S -Q --remote-port %s" % (sumoBinary, PORT), shell=True, stdout=sys.stdout)
traci.init(PORT)
while not step > traciEndTime:
traci.simulationStep()
vehs = traci.vehicle.getIDList()
if vehs.index("horiz") < 0 or len(vehs) > 3:
print("Something is wrong")
step += 1
print("Print ended at step %s" % traci.simulation.getTime())
traci.close()
sumoProcess.wait()
sys.stdout.flush()
def run():
"""execute the TraCI control loop"""
step = 0
while step < 500:
traci.simulationStep()
if step == 200:
requestToC(ToC_vehicle, timeTillMRM)
t = traci.simulation.getTime()
print("Requested ToC of '%s' at t=%s (until t=%s)" % (ToC_vehicle, t, t + timeTillMRM))
if step == 202:
requestToC(ToC_vehicle2, timeTillMRM)
t = traci.simulation.getTime()
print("Requested ToC of '%s' at t=%s (until t=%s)" % (ToC_vehicle2, t, t + timeTillMRM))
printToCParams(ToC_vehicle, True)
printToCParams(ToC_vehicle2, True)
step += 1
print("[%03d] Context results for vehicle '%s':" % (step, objID))
for v in sorted(traci.vehicle.getContextSubscriptionResults(objID)):
print(v)
if not subscribed:
print("Subscribing to vehicle context of object '%s'" % (objID))
traci.vehicle.subscribeContext(objID, traci.constants.CMD_GET_VEHICLE_VARIABLE,
viewRange, [traci.constants.VAR_POSITION])
sys.stdout.flush()
traci.vehicle.addSubscriptionFilterLanes([-1, 0, 1])
subscribed = True
step += 1
print("Print ended at %s" % traci.simulation.getTime())
traci.close()
sys.stdout.flush()
print("[%03d] Context results for vehicle '%s':" % (step, objID))
for v in sorted(traci.vehicle.getContextSubscriptionResults(objID) or []):
print(v)
if not subscribed:
print("Subscribing to vehicle context of object '%s'" % (objID))
traci.vehicle.subscribeContext(objID, traci.constants.CMD_GET_VEHICLE_VARIABLE,
viewRange, [traci.constants.VAR_POSITION])
sys.stdout.flush()
traci.vehicle.addSubscriptionFilterLCManeuver(1)
subscribed = True
step += 1
print("Print ended at %s" % traci.simulation.getTime())
traci.close()
sys.stdout.flush()
def run():
"""execute the TraCI control loop"""
# Keep follower behind the ToC_vehicle
traci.vehicle.changeLane("follower", 0, 200)
step = 0
while step < 500:
traci.simulationStep()
if step == 5:
requestToC(ToC_vehicle, timeTillMRM)
t = traci.simulation.getTime()
print("Requested ToC of veh0 at t=%s (until t=%s)" % (t, t + timeTillMRM))
printToCParams(ToC_vehicle, True)
step += 1
def step():
s = traci.simulation.getTime()
traci.simulationStep()
return s
sys.stdout.flush()
traci.vehicle.addSubscriptionFilterDownstreamDistance(downstreamDist)
traci.vehicle.addSubscriptionFilterUpstreamDistance(upstreamDist)
traci.vehicle.addSubscriptionFilterLanes(lanes)
if not opposite:
traci.vehicle.addSubscriptionFilterNoOpposite()
subscribed = True
step += 1
traci.vehicle.unsubscribeContext(egoID, traci.constants.CMD_GET_VEHICLE_VARIABLE, 0.0)
responses = traci.simulationStep()
if responses:
print("Error: Unsubscribe did not work")
else:
print("Ok: Unsubscribe successful")
print("Print ended at step %s" % traci.simulation.getTime())
traci.close()
sys.stdout.flush()
def step():
s = traci.simulation.getTime()
traci.simulationStep()
return s
traci.setOrder(index)
step = 1
nrEnteredVehicles = 0
sumoStop = False
try:
traciEndStep = math.ceil(traciEndTime / steplength)
while not step > traciEndStep:
traci.simulationStep(step * steplength)
# print(index, "asking for vehicles")
# sys.stdout.flush()
traci.vehicle.getIDList()
nrEnteredVehicles += traci.simulation.getDepartedNumber()
# ~ print(index, "Newly entered vehicles: ", traci.simulation.getDepartedNumber(), "(vehs: ", vehs, ")")
# ~ sys.stdout.flush()
step += 1
endTime = traci.simulation.getTime()
traci.close()
except traci.FatalTraCIError as e:
if str(e) == "connection closed by SUMO":
time.sleep(orderTime * index) # assure ordering of outputs
sumoStop = True
print("client %s: " % index, str(e), " (at TraCIStep %s)" % step)
sys.stdout.flush()
else:
raise
if not sumoStop:
time.sleep(orderTime * index) # assure ordering of outputs
print("Process %s ended at step %s" % (index, endTime))
print("Process %s was informed about %s entered vehicles" % (index, nrEnteredVehicles))
sys.stdout.flush()
def sim_step(self):
"""
Return current simulation second on SUMO
"""
return traci.simulation.getTime()