Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import sumolib # noqa
def ppStages(comment, stages):
print("%s\n %s\n" % (comment, "\n ".join(map(str, stages))))
sumoBinary = os.environ["SUMO_BINARY"]
cmd = [
sumoBinary,
'--begin', '1',
'-n', 'input_net.net.xml',
'--no-step-log', ]
traci.start(cmd)
traci.simulationStep(1.)
print("step", traci.simulation.getTime())
traci.close()
sumoBinary,
'-n', 'input_net.net.xml',
'-r', 'input_routes.rou.xml',
'--lanechange-output', 'lanechanges.xml',
'--lanechange-output.started',
'--lanechange-output.ended',
'--fcd-output', 'fcd.xml',
'--no-step-log',
'--begin', '0',
# ~ '--lateral-resolution', '3.2',
# '-S', '-Q',
'--step-length', '0.1',
'--default.action-step-length', '1.0',
"--default.speeddev", "0"]
traci.start(cmd)
traci.simulationStep()
vehID = "v0"
traci.vehicle.setLaneChangeMode(vehID, 0b0100000000)
traci.vehicle.changeLane(vehID, 1, 0)
for i in range(10):
step = traci.simulation.getTime()
traci.simulationStep(step + 1.)
traci.vehicle.setSpeed(vehID, 0)
for i in range(10):
step = traci.simulation.getTime()
traci.simulationStep(step + 1.)
traci.vehicle.setSpeed(vehID, -1)
for i in range(10):
step = traci.simulation.getTime()
traci.simulationStep(step + 1.)
# step to in between action steps
def runSingle(traciEndTime, downstreamDist, upstreamDist, lanes, opposite, vTypes, vClasses):
step = 0
traci.start(sumoCall + ["-n", "input_net.net.xml", "-r", "input_routes.rou.xml", "--no-step-log", "true"])
subscribed = False
while not step > traciEndTime:
responses = traci.simulationStep()
near1 = set()
if subscribed:
print("Context results for veh '%s':" % egoID)
for v in sorted(traci.vehicle.getContextSubscriptionResults(egoID) or []):
print(v)
near1.add(v)
if not subscribed:
print("Subscribing to context of vehicle '%s'" % (egoID))
traci.vehicle.subscribeContext(egoID, traci.constants.CMD_GET_VEHICLE_VARIABLE, 0.0,
[traci.constants.VAR_POSITION])
print("""Adding subscription filters ...
(downstreamDist=%s, upstreamDist=%s, lanes=%s, opposite=%s
# SPDX-License-Identifier: EPL-2.0
# @file runner.py
# @author Jakob Erdmann
# @date 2018-09-27
import os
import sys
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
import traci # noqa
import sumolib # noqa
sumoBinary = sumolib.checkBinary('sumo')
traci.start([sumoBinary,
"-n", "input_net.net.xml",
"-r", "input_routes.rou.xml",
"-b", "2147400", # ~ 2**31 / 1000
"--no-step-log",
])
for i in range(100):
traci.simulationStep()
traci.vehicle.add(str(i), "r0")
traci.close()
def runSingle(traciEndTime, viewRange, objID):
step = 0
traci.start(sumoCall + ["-c", "sumo.sumocfg"])
subscribed = False
while not step > traciEndTime:
traci.simulationStep()
if subscribed:
# check if objID has arrived at destination
arrivedList = traci.simulation.getArrivedIDList()
if objID in arrivedList:
print("[%03d] Vehicle '%s' has arrived at destination" % (step, objID))
break
print("[%03d] Context results for vehicle '%s':" % (step, objID))
for v in sorted(traci.vehicle.getContextSubscriptionResults(objID)):
print(v)
# this is the main entry point of this script
if __name__ == "__main__":
options = get_options()
# this script has been called from the command line. It will start sumo as a
# server, then connect and run
if options.nogui:
sumoBinary = checkBinary('sumo')
else:
sumoBinary = checkBinary('sumo-gui')
# this is the normal way of using traci. sumo is started as a
# subprocess and then the python script connects and runs
traci.start([sumoBinary, "-n", "input_net.net.xml", "-r", "input_routes.rou.xml", "-a",
"input_additional.add.xml", "--fcd-output", "fcd.xml",
"--no-step-log", "true",
"--default.speeddev", "0"])
# Wait until the vehicle enters
while ToC_vehicle not in traci.vehicle.getIDList() or ToC_vehicle2 not in traci.vehicle.getIDList():
traci.simulationStep()
printToCParams(ToC_vehicle)
printToCParams(ToC_vehicle2)
run()
traci.close()
def main(args):
sumo_call = [sumolib.checkBinary('sumo'), "-c", "data/hello.sumocfg",
"--netstate-dump", "rawdump.xml",
"--no-step-log",
"-v",
]
traci.start(sumo_call)
step = -1
while True:
step += 1
traci.simulationStep()
if step == 140:
traci.vehicle.resume('Stapler_00')
if step == 161:
traci.close()
break
def getPreProcessData(self, totaldays):
preData = {}
for i in ni.trafficLights:
preData[i] = []
sumoCmd = ["sumo", "-c", ni.sumocfg, "--ignore-route-errors", "--time-to-teleport", "600"]
for i in range(totaldays):
self.log.debug('collecting preprocess data: ' + str(i) + ' days')
getNewDemand()
traci.start(sumoCmd)
self.createRecord()
step = 0
while step < 7200:
self.addRecord(step, addVehicleRecord = False)
step += 1
for tl in ni.trafficLights:
flag = util.adjustFlag(tl, self.record)
if flag:
feature = util.getFeature(tl, self.record)
preData[tl].append(feature)
traci.simulationStep()
traci.close()
#self.clearRecord()
if (i+1)%10 == 0:
for tl in ni.trafficLights:
a = np.array(preData[tl])
def getPreProcessData(self, totaldays, file):
sumoBinary = "sumo"
sumoCmd = [sumoBinary, "-c", ni.sumocfg, "--ignore-route-errors", "--time-to-teleport", "2000"]
totalfeature = []
for i in range(totaldays):
getNewDemands()
self.logger.debug('collecting days: '+str(i) + ' data')
traci.start(sumoCmd)
step = 0
self.clearRecord()
while step < 7200:
self.addRecord(traci, step, getwaittime = False)
adjustFlag = self.getAdjustFlag()
if adjustFlag:
feature = self.getFeatureList(-1)
totalfeature.append(feature)
traci.simulationStep()
step += 1
traci.close()
tf = np.array(totalfeature)
np.save(file, tf)
def run(self, episode):
# first, generate the route file for this simulation and set up sumo
traffic_mode = generate_routefile(episode, self._max_steps)
traci.start(self._sumoCmd)
# set the epsilon for this episode
self._eps = 1.0 - (episode / self._total_episodes)
# inits
self._steps = 0
tot_neg_reward = 0
old_total_wait = 0
self._waiting_times = {}
self._sum_intersection_queue = 0
# simulation (self._steps updated in function "_simulate")
while self._steps < self._max_steps:
# get current state of the intersection
current_state = self._get_state()