Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def runSingle(traciEndTime, viewRange, objID):
step = 0
traci.start(sumoCall + ["-c", "sumo.sumocfg"])
subscribed = False
while not step > traciEndTime:
traci.simulationStep()
if subscribed:
# check if objID has arrived at destination
arrivedList = traci.simulation.getArrivedIDList()
if objID in arrivedList:
print("[%03d] Vehicle '%s' has arrived at destination" % (step, objID))
break
print("[%03d] Context results for vehicle '%s':" % (step, objID))
for v in sorted(traci.vehicle.getContextSubscriptionResults(objID) or []):
print(v)
if not subscribed:
print("Subscribing to vehicle context of object '%s'" % (objID))
traci.vehicle.subscribeContext(objID, traci.constants.CMD_GET_VEHICLE_VARIABLE,
viewRange, [traci.constants.VAR_POSITION])
# + ['--save-configuration', 'debug.sumocfg']
)
while traci.simulation.getMinExpectedNumber() > 0:
traci.simulationStep()
print("tripinfos at last step: %s" % countWrittenTrips('tripinfos.xml'))
print("logfile at last step: %s" % lastLine('log'))
traci.load(opts + ['--tripinfo-output', 'tripinfos2.xml', '-l', 'log2'])
traci.simulationStep()
print("tripinfos after load: %s" % countWrittenTrips('tripinfos.xml'))
print("logfile after load: %s" % lastLine('log'))
while traci.simulation.getMinExpectedNumber() > 0:
traci.simulationStep()
print("tripinfos2 at last step: %s" % countWrittenTrips('tripinfos2.xml'))
print("logfile2 at last step: %s" % lastLine('log2'))
traci.close()
print("tripinfos2 after close: %s" % countWrittenTrips('tripinfos2.xml'))
print("logfile2 after close: %s" % lastLine('log2'))
# done
def runSingle(traciEndTime, viewRange, objID):
step = 0
traci.start(sumoCall + ["-c", "sumo.sumocfg"])
subscribed = False
while not step > traciEndTime:
traci.simulationStep()
if subscribed:
# check if objID has arrived at destination
arrivedList = traci.simulation.getArrivedIDList()
if objID in arrivedList:
print("[%03d] Vehicle '%s' has arrived at destination" % (step, objID))
break
print("[%03d] Context results for vehicle '%s':" % (step, objID))
for v in sorted(traci.vehicle.getContextSubscriptionResults(objID) or []):
print(v)
if not subscribed:
print("Subscribing to vehicle context of object '%s'" % (objID))
traci.vehicle.subscribeContext(objID, traci.constants.CMD_GET_VEHICLE_VARIABLE,
viewRange, [traci.constants.VAR_POSITION])
# this is the normal way of using traci. sumo is started as a
# subprocess and then the python script connects and runs
traci.start([sumoBinary, "-n", "input_net.net.xml", "-r", "input_routes.rou.xml", "-a",
"input_additional.add.xml,input_additional2.add.xml",
"--no-step-log", "true",
"--default.speeddev", "0"])
# Wait until the vehicle enters
while ToC_vehicle not in traci.vehicle.getIDList():
traci.simulationStep()
printToCParams(ToC_vehicle)
requestToC(ToC_vehicle, timeTillMRM)
for t in range(100):
traci.simulationStep()
if t % 20 == 0:
printToCParams(ToC_vehicle, True)
print("Current type: %s" % traci.vehicle.getTypeID(ToC_vehicle))
requestToC(ToC_vehicle, 0)
for t in range(100):
traci.simulationStep()
if t % 20 == 0:
printToCParams(ToC_vehicle, True)
print("Current type: %s" % traci.vehicle.getTypeID(ToC_vehicle))
traci.close()
def runSingle(addOption):
step = 0
timeline = []
traci.start([sumoBinary, "-c", "sumo.sumocfg"] + addOption)
while not step > 10000:
try:
traci.simulationStep()
vehs = traci.vehicle.getIDList()
timeline.append({})
for v in vehs:
timeline[-1][v] = traci.vehicle.getSpeed(v)
step += 1
except FatalTraCIError:
print("Closed by SUMO")
break
traci.close()
sys.stdout.flush()
return timeline
def getStates(transition_time): # made the order changes
newLeftState = []
newRightState = []
newUpperLeftState = []
newUpperRightState = []
for _ in range(transition_time):
traci.simulationStep()
l_leftcount = 0
l_rightcount = 0
l_topcount = 0
l_bottomcount = 0
r_leftcount = 0
r_rightcount = 0
r_topcount = 0
r_bottomcount = 0
ul_leftcount = 0
ul_rightcount = 0
ul_topcount = 0
ul_bottomcount = 0
def makeMove(action, transition_time, experience):
if action == 1:
traci.trafficlight.setPhase("0", (int(traci.trafficlight.getPhase("0")) + 1) % 4)
for _ in range(transition_time):
traci.simulationStep()
# traci.simulationStep()
# traci.simulationStep()
# traci.simulationStep()
# traci.simulationStep()
#newState = getState()
experience.pop(0)
experience.append(getState())
return experience
self.createTrainRecord()
step = 0
traci.start(sumoCmd)
while step < 7200:
self.simulation(traci, step, record = False, epsilon = 0)
step += 1
traci.close()
meanwaittime = self.getMeanWaitTime()
self.getCsvTrafficLight('csv/model.csv')
self.createRecord()
step = 0
traci.start(sumoCmd)
while step < 7200:
self.addRecord(traci, step)
traci.simulationStep()
step += 1
traci.close()
fixmeanwaittime = self.getMeanWaitTime()
self.getCsvTrafficLight('csv/fix.csv', action = False)
self.logger.debug(" fix : " + str(fixmeanwaittime) + " model: " + str(meanwaittime))
def makeMove(state,action):
traci.trafficlight.setPhase("0",action)
# agent.simulateFrames(SIM_FRAMES)
traci.simulationStep()
traci.simulationStep()
traci.simulationStep()
traci.simulationStep()
traci.simulationStep()
traci.simulationStep()
traci.simulationStep()
traci.simulationStep()
traci.simulationStep()
newState = getState()
return newState
action : an action provided by the environment
Outputs
-------
(observation, reward, done, info)
observation : agent's observation of the current environment
reward [Float] : amount of reward due to the previous action
done : a boolean, indicating whether the episode has ended
info : a dictionary containing other diagnostic information from the previous action
"""
ctrl_ids = self.ctrl_ids if self.fullbool else range(self.num_cars)
new_speed = self._state[0, ctrl_ids] + action # self.ctrl_ids
new_speed[np.where(new_speed < 0)] = 0
for car_idx in range(self.num_cars):
# almost instantaneous
traci.vehicle.slowDown(self.controllable[car_idx], new_speed[car_idx], 1)
traci.simulationStep()
ctrl_ids = self.car_ids if self.fullbool else self.controllable
if self.expandedstate:
self._state = np.array([[traci.vehicle.getSpeed(vID), \
self.get_lane_position(vID), \
traci.vehicle.getLaneIndex(vID)] for vID in ctrl_ids]).T
else:
self._state = np.array([[traci.vehicle.getSpeed(vID) for vID in ctrl_ids]])
# print("IN STEP, STATE", self._state.shape)
# Horizontal concat of vertical vectors
# self._state = np.concatenate((velocities.reshape(len(velocities), 1).T, ), axis=1)
# done = np.all(abs(self._state-self.GOAL_VELOCITY) < self.delta)
next_observation = np.copy(self._state)