Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, env, ts_id, delta_time, yellow_time, min_green, max_green, phases):
self.id = ts_id
self.env = env
self.time_on_phase = 0.0
self.delta_time = delta_time
self.yellow_time = yellow_time
self.min_green = min_green
self.max_green = max_green
self.green_phase = 0
self.num_green_phases = len(phases) // 2
self.lanes = list(dict.fromkeys(traci.trafficlight.getControlledLanes(self.id))) # remove duplicates and keep order
logic = traci.trafficlight.Logic("new-program", 0, 0, phases=phases)
traci.trafficlight.setCompleteRedYellowGreenDefinition(self.id, logic)
def run():
"""execute the TraCI control loop"""
step = 0
# we start with phase 2 where EW has green
traci.trafficlight.setPhase("0", 2)
while traci.simulation.getMinExpectedNumber() > 0:
traci.simulationStep()
if traci.trafficlight.getPhase("0") == 2:
# we are not already switching
if traci.inductionloop.getLastStepVehicleNumber("0") > 0:
# there is a vehicle from the north, switch
traci.trafficlight.setPhase("0", 3)
else:
# otherwise try to keep green for EW
traci.trafficlight.setPhase("0", 2)
step += 1
traci.close()
sys.stdout.flush()
def check():
print("examining", tlsID)
print("ryg", traci.trafficlight.getRedYellowGreenState(tlsID))
print("rygdef", traci.trafficlight.getCompleteRedYellowGreenDefinition(tlsID))
print("lanes", traci.trafficlight.getControlledLanes(tlsID))
print("links", traci.trafficlight.getControlledLinks(tlsID))
print("program", traci.trafficlight.getProgram(tlsID))
print("phase", traci.trafficlight.getPhase(tlsID))
print("phaseName", traci.trafficlight.getPhaseName(tlsID))
print("switch", traci.trafficlight.getNextSwitch(tlsID))
def check():
print("examining", tlsID)
print("ryg", traci.trafficlight.getRedYellowGreenState(tlsID))
print("rygdef", traci.trafficlight.getCompleteRedYellowGreenDefinition(tlsID))
print("lanes", traci.trafficlight.getControlledLanes(tlsID))
print("links", traci.trafficlight.getControlledLinks(tlsID))
print("program", traci.trafficlight.getProgram(tlsID))
print("phase", traci.trafficlight.getPhase(tlsID))
print("phaseName", traci.trafficlight.getPhaseName(tlsID))
print("switch", traci.trafficlight.getNextSwitch(tlsID))
for e in range(episodes):
# DNN Agent
# Initialize DNN with random weights
# Initialize target network with same weights as DNN Network
#log = open('log.txt', 'a')
step = 0
waiting_time = 0
reward1 = 0
reward2 = 0
total_reward = reward1 - reward2
stepz = 0
action = 0
traci.start([sumoBinary, "-c", "cross3ltl.sumocfg", '--start'])
traci.trafficlight.setPhase("0", 0)
traci.trafficlight.setPhaseDuration("0", 200)
while traci.simulation.getMinExpectedNumber() > 0 and stepz < 7000:
traci.simulationStep()
state = sumoInt.getState()
action = agent.act(state)
light = state[2]
if(action == 0 and light[0][0][0] == 0):
# Transition Phase
for i in range(6):
stepz += 1
traci.trafficlight.setPhase('0', 1)
waiting_time += (traci.edge.getLastStepHaltingNumber('1si') + traci.edge.getLastStepHaltingNumber(
'2si') + traci.edge.getLastStepHaltingNumber('3si') + traci.edge.getLastStepHaltingNumber('4si'))
traci.simulationStep()
for i in range(10):
stepz += 1
def makeMoves(leftAction, rightAction, upperLeftAction, upperRightAction, transition_time):
if leftAction == 1:
traci.trafficlight.setPhase("0", (int(traci.trafficlight.getPhase("0")) + 1) % 4)
if rightAction == 1:
traci.trafficlight.setPhase("10", (int(traci.trafficlight.getPhase("10")) + 1) % 4)
if upperLeftAction == 1:
traci.trafficlight.setPhase("01", (int(traci.trafficlight.getPhase("01")) + 1) % 4)
if upperRightAction == 1:
traci.trafficlight.setPhase("11", (int(traci.trafficlight.getPhase("11")) + 1) % 4)
return getStates(transition_time)
def makeMove(phase, transition_time):
traci.trafficlight.setPhase("0", phase)
for _ in range(transition_time):
traci.simulationStep()
# traci.simulationStep()
# traci.simulationStep()
# traci.simulationStep()
# traci.simulationStep()
newState = getState()
return newState
def makeMove(action, transition_time, experience):
if action == 1:
traci.trafficlight.setPhase("0", (int(traci.trafficlight.getPhase("0")) + 1) % 4)
for _ in range(transition_time):
traci.simulationStep()
# traci.simulationStep()
# traci.simulationStep()
# traci.simulationStep()
# traci.simulationStep()
#newState = getState()
experience.pop(0)
experience.append(getState())
return experience
queueLength = getQueueLength()
action = np.random.choice(np.arange(nA))
new_state = makeMove(action,transition_time)
new_queueLength = getQueueLength()
reward = getReward(queueLength,new_queueLength)
replay_memory.append([state,action,reward,new_state])
print(len(replay_memory))
total_t = 0
for episode in range(num_episode):
traci.load(["--start", "-c", "data/cross.sumocfg",
"--tripinfo-output", "tripinfo.xml"])
traci.trafficlight.setPhase("0", 0)
state = getState_baseline(transition_time)
queueLength = getQueueLength()
counter = 0
stride = 0
delay_data_avg = []
delay_data_min = []
delay_data_max = []
delay_data_time = []
while traci.simulation.getMinExpectedNumber() > 0:
print("Episode # ", episode)
# print("Waiting time on lane 1i_0 = ",getWaitingTime("1i_0"))
print("Inside episode counter", counter)
def getLeftPhaseState(transition_time):
num_lanes = 4
num_phases = 4
phase = traci.trafficlight.getPhase("0")
phaseState = np.zeros((transition_time, num_lanes, num_phases))
for i in range(transition_time):
for j in range(num_lanes):
phaseState[i][j][phase] = 1
return phaseState