Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
print("examining", tlsID)
print("ryg", traci.trafficlight.getRedYellowGreenState(tlsID))
print("rygdef", traci.trafficlight.getCompleteRedYellowGreenDefinition(tlsID))
print("lanes", traci.trafficlight.getControlledLanes(tlsID))
print("links", traci.trafficlight.getControlledLinks(tlsID))
print("program", traci.trafficlight.getProgram(tlsID))
print("phase", traci.trafficlight.getPhase(tlsID))
print("phaseName", traci.trafficlight.getPhaseName(tlsID))
print("switch", traci.trafficlight.getNextSwitch(tlsID))
phases = []
phases.append(traci.trafficlight.Phase(30, "rrrrGGggrrrrGGgg", 0, 0, [1, 2, 3], "setViaComplete"))
phases.append(traci.trafficlight.Phase(10, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(40, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(20, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(20, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(20, "rrrrGGggrrrrGGgg", 0, 0))
logic = traci.trafficlight.Logic("custom", 0, 0, phases)
traci.trafficlight.setCompleteRedYellowGreenDefinition(tlsID, logic)
traci.trafficlight.setPhase(tlsID, 4)
traci.trafficlight.setPhaseName(tlsID, "setByTraCI")
traci.trafficlight.setPhaseDuration(tlsID, 23)
print("waitingPersons", traci.trafficlight.getServedPersonCount(tlsID, 2))
check()
defs = traci.trafficlight.getCompleteRedYellowGreenDefinition(tlsID)
print("numDefs=%s numPhases=%s" % (len(defs), list(map(lambda d: len(d.getPhases()), defs))))
traci.trafficlight.subscribe(tlsID)
print(traci.trafficlight.getSubscriptionResults(tlsID))
for step in range(3, 6):
print("step", step)
def check():
print("examining", tlsID)
print("ryg", traci.trafficlight.getRedYellowGreenState(tlsID))
print("rygdef", traci.trafficlight.getCompleteRedYellowGreenDefinition(tlsID))
print("lanes", traci.trafficlight.getControlledLanes(tlsID))
print("links", traci.trafficlight.getControlledLinks(tlsID))
print("program", traci.trafficlight.getProgram(tlsID))
print("phase", traci.trafficlight.getPhase(tlsID))
print("phaseName", traci.trafficlight.getPhaseName(tlsID))
print("switch", traci.trafficlight.getNextSwitch(tlsID))
phases = []
phases.append(traci.trafficlight.Phase(30, "rrrrGGggrrrrGGgg", 0, 0, [1, 2, 3], "setViaComplete"))
phases.append(traci.trafficlight.Phase(10, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(40, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(20, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(20, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(20, "rrrrGGggrrrrGGgg", 0, 0))
logic = traci.trafficlight.Logic("custom", 0, 0, phases)
traci.trafficlight.setCompleteRedYellowGreenDefinition(tlsID, logic)
traci.trafficlight.setPhase(tlsID, 4)
traci.trafficlight.setPhaseName(tlsID, "setByTraCI")
traci.trafficlight.setPhaseDuration(tlsID, 23)
print("waitingPersons", traci.trafficlight.getServedPersonCount(tlsID, 2))
check()
defs = traci.trafficlight.getCompleteRedYellowGreenDefinition(tlsID)
print("numDefs=%s numPhases=%s" % (len(defs), list(map(lambda d: len(d.getPhases()), defs))))
traci.trafficlight.subscribe(tlsID)
def check():
print("examining", tlsID)
print("ryg", traci.trafficlight.getRedYellowGreenState(tlsID))
print("rygdef", traci.trafficlight.getCompleteRedYellowGreenDefinition(tlsID))
print("lanes", traci.trafficlight.getControlledLanes(tlsID))
print("links", traci.trafficlight.getControlledLinks(tlsID))
print("program", traci.trafficlight.getProgram(tlsID))
print("phase", traci.trafficlight.getPhase(tlsID))
print("phaseName", traci.trafficlight.getPhaseName(tlsID))
print("switch", traci.trafficlight.getNextSwitch(tlsID))
phases = []
phases.append(traci.trafficlight.Phase(30, "rrrrGGggrrrrGGgg", 0, 0, [1, 2, 3], "setViaComplete"))
phases.append(traci.trafficlight.Phase(10, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(40, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(20, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(20, "rrrrGGggrrrrGGgg", 0, 0))
phases.append(traci.trafficlight.Phase(20, "rrrrGGggrrrrGGgg", 0, 0))
logic = traci.trafficlight.Logic("custom", 0, 0, phases)
traci.trafficlight.setCompleteRedYellowGreenDefinition(tlsID, logic)
traci.trafficlight.setPhase(tlsID, 4)
traci.trafficlight.setPhaseName(tlsID, "setByTraCI")
traci.trafficlight.setPhaseDuration(tlsID, 23)
print("waitingPersons", traci.trafficlight.getServedPersonCount(tlsID, 2))
check()
defs = traci.trafficlight.getCompleteRedYellowGreenDefinition(tlsID)
print("numDefs=%s numPhases=%s" % (len(defs), list(map(lambda d: len(d.getPhases()), defs))))
traci.trafficlight.subscribe(tlsID)
print(traci.trafficlight.getSubscriptionResults(tlsID))
for step in range(3, 6):
if __name__ == '__main__':
alpha = 0.1
gamma = 0.99
decay = 1
runs = 1
env = SumoEnvironment(net_file='nets/4x4-Lucas/4x4.net.xml',
route_file='nets/4x4-Lucas/4x4c2.rou.xml',
use_gui=True,
num_seconds=40000,
time_to_load_vehicles=300,
max_depart_delay=0,
phases=[
traci.trafficlight.Phase(35, "GGGrrr"), # north-south
traci.trafficlight.Phase(2, "yyyrrr"),
traci.trafficlight.Phase(35, "rrrGGG"), # west-east
traci.trafficlight.Phase(2, "rrryyy")
])
for run in range(1, runs+1):
initial_states = env.reset()
ql_agents = {ts: QLAgent(starting_state=env.encode(initial_states[ts]),
state_space=env.observation_space,
action_space=env.action_space,
alpha=alpha,
gamma=gamma,
exploration_strategy=EpsilonGreedy(initial_epsilon=0.05, min_epsilon=0.005, decay=decay)) for ts in env.ts_ids}
infos = []
done = {'__all__': False}
while not done['__all__']:
register_env("2way-single-intersection", lambda _: SumoEnvironment(net_file='nets/2way-single-intersection/single-intersection.net.xml',
route_file='nets/2way-single-intersection/single-intersection-gen.rou.xml',
out_csv_name='outputs/2way-single-intersection/a3c-contexts',
use_gui=False,
num_seconds=100000,
time_to_load_vehicles=120,
max_depart_delay=0,
phases=[
traci.trafficlight.Phase(32, "GGrrrrGGrrrr"),
traci.trafficlight.Phase(2, "yyrrrryyrrrr"),
traci.trafficlight.Phase(32, "rrGrrrrrGrrr"),
traci.trafficlight.Phase(2, "rryrrrrryrrr"),
traci.trafficlight.Phase(32, "rrrGGrrrrGGr"),
traci.trafficlight.Phase(2, "rrryyrrrryyr"),
traci.trafficlight.Phase(32, "rrrrrGrrrrrG"),
traci.trafficlight.Phase(2, "rrrrryrrrrry")
]))
out_csv = 'outputs/big-intersection/sarsa-f7' #+ experiment_time
env = SumoEnvironment(net_file='nets/big-intersection/big-intersection.net.xml',
single_agent=True,
route_file='nets/big-intersection/routes.rou.xml',
out_csv_name=out_csv,
use_gui=False,
num_seconds=5400,
yellow_time=4,
min_green=5,
max_green=60,
max_depart_delay=300,
time_to_load_vehicles=0,
phases=[
traci.trafficlight.Phase(30, "GGGGrrrrrrGGGGrrrrrr"),
traci.trafficlight.Phase(4, "yyyyrrrrrryyyyrrrrrr"),
traci.trafficlight.Phase(15, "rrrrGrrrrrrrrrGrrrrr"),
traci.trafficlight.Phase(4, "rrrryrrrrrrrrryrrrrr"),
traci.trafficlight.Phase(30, "rrrrrGGGGrrrrrrGGGGr"),
traci.trafficlight.Phase(4, "rrrrryyyyrrrrrryyyyr"),
traci.trafficlight.Phase(15, "rrrrrrrrrGrrrrrrrrrG"),
traci.trafficlight.Phase(4, "rrrrrrrrryrrrrrrrrry")])
fixed_tl = False
agent = TrueOnlineSarsaLambda(env.observation_space, env.action_space, alpha=0.000000001, gamma=0.95, epsilon=0.05, lamb=0.1, fourier_order=7)
for run in range(1, 4 +1):
obs = env.reset()
done = False
if fixed_tl:
while not done:
env = SubprocVecEnv([lambda: SumoEnvironment(net_file='nets/2way-single-intersection/single-intersection.net.xml',
route_file='nets/2way-single-intersection/single-intersection-gen.rou.xml',
out_csv_name='outputs/2way-single-intersection/a2c-contexts-5s-vmvm-400k',
single_agent=True,
use_gui=True,
num_seconds=400000,
min_green=5,
time_to_load_vehicles=120,
max_depart_delay=0,
phases=[
traci.trafficlight.Phase(32, "GGrrrrGGrrrr"),
traci.trafficlight.Phase(2, "yyrrrryyrrrr"),
traci.trafficlight.Phase(32, "rrGrrrrrGrrr"),
traci.trafficlight.Phase(2, "rryrrrrryrrr"),
traci.trafficlight.Phase(32, "rrrGGrrrrGGr"),
traci.trafficlight.Phase(2, "rrryyrrrryyr"),
traci.trafficlight.Phase(32, "rrrrrGrrrrrG"),
traci.trafficlight.Phase(2, "rrrrryrrrrry")
]) for i in range(n_cpu)])
register_env("2x2grid", lambda _: SumoEnvironment(net_file='nets/2x2grid/2x2.net.xml',
route_file='nets/2x2grid/2x2.rou.xml',
out_csv_name='outputs/2x2grid/a3c-contexts',
use_gui=True,
num_seconds=100000,
time_to_load_vehicles=120,
max_depart_delay=0,
phases=[
traci.trafficlight.Phase(32, "GGrrrrGGrrrr"),
traci.trafficlight.Phase(2, "yyrrrryyrrrr"),
traci.trafficlight.Phase(32, "rrGrrrrrGrrr"),
traci.trafficlight.Phase(2, "rryrrrrryrrr"),
traci.trafficlight.Phase(32, "rrrGGrrrrGGr"),
traci.trafficlight.Phase(2, "rrryyrrrryyr"),
traci.trafficlight.Phase(32, "rrrrrGrrrrrG"),
traci.trafficlight.Phase(2, "rrrrryrrrrry")
]))
register_env("2x2grid", lambda _: SumoEnvironment(net_file='nets/2x2grid/2x2.net.xml',
route_file='nets/2x2grid/2x2.rou.xml',
out_csv_name='outputs/2x2grid/a3c-contexts',
use_gui=True,
num_seconds=100000,
time_to_load_vehicles=120,
max_depart_delay=0,
phases=[
traci.trafficlight.Phase(32, "GGrrrrGGrrrr"),
traci.trafficlight.Phase(2, "yyrrrryyrrrr"),
traci.trafficlight.Phase(32, "rrGrrrrrGrrr"),
traci.trafficlight.Phase(2, "rryrrrrryrrr"),
traci.trafficlight.Phase(32, "rrrGGrrrrGGr"),
traci.trafficlight.Phase(2, "rrryyrrrryyr"),
traci.trafficlight.Phase(32, "rrrrrGrrrrrG"),
traci.trafficlight.Phase(2, "rrrrryrrrrry")
]))
register_env("2x2grid", lambda _: SumoEnvironment(net_file='nets/2x2grid/2x2.net.xml',
route_file='nets/2x2grid/2x2.rou.xml',
out_csv_name='outputs/2x2grid/a3c-contexts',
use_gui=True,
num_seconds=100000,
time_to_load_vehicles=120,
max_depart_delay=0,
phases=[
traci.trafficlight.Phase(32, "GGrrrrGGrrrr"),
traci.trafficlight.Phase(2, "yyrrrryyrrrr"),
traci.trafficlight.Phase(32, "rrGrrrrrGrrr"),
traci.trafficlight.Phase(2, "rryrrrrryrrr"),
traci.trafficlight.Phase(32, "rrrGGrrrrGGr"),
traci.trafficlight.Phase(2, "rrryyrrrryyr"),
traci.trafficlight.Phase(32, "rrrrrGrrrrrG"),
traci.trafficlight.Phase(2, "rrrrryrrrrry")
]))