Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run():
"""execute the TraCI control loop"""
step = 0
lastLane = traci.vehicle.getLaneIndex(ToC_vehicle)
t = traci.simulation.getTime()
print("Time %s: Current lane of veh '%s': %s" % (t, ToC_vehicle, lastLane))
traci.simulationStep()
while traci.simulation.getMinExpectedNumber() > 0:
if step == 30:
requestToC(ToC_vehicle, timeTillMRM)
t = traci.simulation.getTime()
print("Requested ToC of veh0 at t=%s (until t=%s)" % (t, t + timeTillMRM))
newLane = traci.vehicle.getLaneIndex(ToC_vehicle)
if newLane != lastLane:
t = traci.simulation.getTime()
print("Time %s: veh '%s' changed lanes. Current: %s" % (t, ToC_vehicle, newLane))
lastLane = newLane
printToCParams(ToC_vehicle, True)
sys.stdout.flush()
step += 1
traci.simulationStep()
print("recovering from exception after asking for unknown person")
print("step", step())
traci.person.removeStages("newPerson")
traci.person.appendDrivingStage("newPerson", "1o", "B42")
traci.route.add("r0", ["3si", "1o"])
traci.vehicle.addLegacy("veh0", "r0", traci.constants.DEPARTFLAG_TRIGGERED, pos=230)
traci.vehicle.setLine("veh0", "B42")
traci.vehicle.setStop("veh0", "3si", 235, laneIndex=2, startPos=230, duration=1)
print("getIDList", traci.person.getIDList())
print("numVehs=%s, numPersons=%s, minExpected=%s" % (
traci.vehicle.getIDCount(),
traci.person.getIDCount(),
traci.simulation.getMinExpectedNumber()))
for i in range(10):
print("step", step())
print(traci.person.getSubscriptionResults(personID))
print("riding in vehicle: '%s'" % traci.vehicle.getParameter("veh0", "device.person.IDList"))
print("riding in vehicle (direct): '%s'" % traci.vehicle.getPersonIDList("veh0"))
print("persons on edge %s at time %s: %s" % (
traci.person.getRoadID("newPerson"),
traci.simulation.getTime(),
traci.edge.getLastStepPersonIDs(traci.person.getRoadID("newPerson"))))
traci.person.removeStages("newPerson")
traci.person.appendWaitingStage(
"newPerson", 10, "Jumped out of a moving vehicle. Ouch!")
import sys
SUMO_HOME = os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "..")
sys.path.append(os.path.join(os.environ.get("SUMO_HOME", SUMO_HOME), "tools"))
import traci # noqa
import sumolib # noqa
traci.start([sumolib.checkBinary('sumo'),
'-c', 'sumo.sumocfg',
'-n', 'input_net2.net.xml',
'-a', 'input_additional.add.xml'
])
traci.simulationStep()
traci.vehicle.rerouteParkingArea("ego", 'parkB1B2')
while traci.simulation.getMinExpectedNumber() > 0:
traci.simulationStep()
traci.close()
def run():
"""execute the TraCI control loop"""
# track the duration for which the green phase of the vehicles has been
# active
greenTimeSoFar = 0
# whether the pedestrian button has been pressed
activeRequest = False
# main loop. do something every simulation step until no more vehicles are
# loaded or running
while traci.simulation.getMinExpectedNumber() > 0:
traci.simulationStep()
# decide wether there is a waiting pedestrian and switch if the green
# phase for the vehicles exceeds its minimum duration
if not activeRequest:
activeRequest = checkWaitingPersons()
if traci.trafficlight.getPhase(TLSID) == VEHICLE_GREEN_PHASE:
greenTimeSoFar += 1
if greenTimeSoFar > MIN_GREEN_TIME:
# check whether someone has pushed the button
if activeRequest:
# switch to the next phase
traci.trafficlight.setPhase(
TLSID, VEHICLE_GREEN_PHASE + 1)
# reset state
sumoBinary = os.environ["SUMO_BINARY"]
cmd = [sumoBinary,
'-n', 'input_net2.net.xml',
'-r', 'input_routes.rou.xml',
'--stop-output', 'stops.xml',
'--no-step-log',
]
traci.start(cmd)
veh = "veh0"
traci.simulationStep()
# remove first stop
traci.vehicle.setStop(veh, "WC", 50, 0, 0)
# keep second stop and add another one
traci.vehicle.setStop(veh, "CE", 50, 0, 0, until=100)
while traci.simulation.getMinExpectedNumber() > 0:
traci.simulationStep()
traci.close()
def run():
"""execute the TraCI control loop"""
step = 0
lastLane = traci.vehicle.getLaneIndex(ToC_vehicle)
t = traci.simulation.getTime()
print("Time %s: Current lane of veh '%s': %s" % (t, ToC_vehicle, lastLane))
traci.simulationStep()
while traci.simulation.getMinExpectedNumber() > 0:
if step == 29:
traci.vehicle.changeLane(ToC_vehicle, 1, 5)
t = traci.simulation.getTime()
print("Time %s: Requested lanechange of veh '%s' to lane %s" % (t, ToC_vehicle, 1))
if step == 49:
# Let vehicle go to left lane, which is strategically not adequate
traci.vehicle.changeLane(ToC_vehicle, 1, 5.)
t = traci.simulation.getTime()
print("Time %s: Requested lanechange of veh '%s' to lane %s" % (t, ToC_vehicle, 1))
if step == 50:
requestToC(ToC_vehicle, timeTillMRM)
t = traci.simulation.getTime()
print("Requested ToC of veh0 at t=%s (until t=%s)" % (t, t + timeTillMRM))
if step == 51:
traci.vehicle.changeLane(ToC_vehicle, 0, 5.)
t = traci.simulation.getTime()
def run():
#
traci.init(traci_port)
#
step = 0
# amber_state = False
#
initialise()
#
while traci.simulation.getMinExpectedNumber() > 0:
#
traci.simulationStep()
#
# if step % 5 == 0:
# amber_state ^= True
#
set_sumo_inputs()
#
open_tlc(step)
# open_tlc(step, amber_state)
#
set_state()
if episode < 15:
generate_routefile(100, 0)
else:
generate_routefile(0, 100)
#generate_routefile()
# generate_routefile_random(episode_time, num_vehicles)
traci.load(["--start", "-c", "data/cross_2intersections_nosublane.sumocfg",
"--tripinfo-output", "tripinfo.xml"])
traci.trafficlight.setPhase("0", 0)
traci.trafficlight.setPhase("10", 0)
leftState, rightState = getStates(transition_time)
counter = 0
stride = 0
while traci.simulation.getMinExpectedNumber() > 0:
print("Episode # ", episode)
# print("Waiting time on lane 1i_0 = ",getWaitingTime("1i_0"))
print("Inside episode counter", counter)
counter += 1
total_t += 1
# batch_experience = experience[:batch_history]
if total_t % target_update_time == 0:
target_estimator_model_left.set_weights(q_estimator_model_left.get_weights())
target_estimator_model_right.set_weights(q_estimator_model_right.get_weights())
q_val_left = q_estimator_model_left.predict(leftState)
q_val_right = q_estimator_model_right.predict(rightState)
print("Left q values : ", q_val_left)
num_vehicles += 1
if episode < 40:
generate_routefile(90,10)
else:
generate_routefile(10,90)
'''
generate_routefile()
#generate_routefile_random(episode_time, num_vehicles)
traci.start([sumoBinary, "-c", "data/cross_2intersections.sumocfg",
"--tripinfo-output", "tripinfo.xml"])
'''traci.trafficlight.setPhase("0", 0)'''
'''state = getState(transition_time)'''
counter = 0
stride = 0
while traci.simulation.getMinExpectedNumber() > 0:
traci.simulationStep()
'''
print("Episode # ", episode)
print("Inside episode counter", counter)
counter += 1
total_t += 1
if total_t % target_update_time == 0:
target_estimator_model.set_weights(q_estimator_model.get_weights())
q_val = q_estimator_model.predict(state)
print(q_val)
traci.trafficlight.setPhase("0", 0)
nA = 4###
state = getState()
experience = []
for i in range(num_history):
experience.append(state)
for episode in range(num_episode):
traci.trafficlight.setPhase("0", 0)
print("INITIAL EXPERIENCE : ")
print(experience)
counter = 0
stride = 0
while traci.simulation.getMinExpectedNumber() > 0:
print("Episode # ", episode)
print("Waiting time on lane 1i_0 = ",getWaitingTime("1i_0"))
print("Inside episode counter",counter)
counter+=1
batch_experience = experience[:batch_history]
q_val = model.predict((np.array(batch_experience)).reshape((1,batch_history, 5)))
print(q_val)
# if random.random() < epsilon:
# phase = np.random.choice(4)
# print("random action chosen",phase)
# else:
# phase = np.argmax(q_val)
# print("else action",phase)
epsilon = 1.0 / (episode+1)
policy_s = np.ones(nA) * epsilon / nA