Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def check(x, y, angle, exLane, exPos, exPosLat, comment):
traci.vehicle.moveToXY(vehID, "", 0, x, y, angle, keepRoute=2)
traci.simulationStep()
x2, y2 = traci.vehicle.getPosition(vehID)
lane2 = traci.vehicle.getLaneID(vehID)
pos2 = traci.vehicle.getLanePosition(vehID)
posLat2 = traci.vehicle.getLateralLanePosition(vehID)
if (abs(x - x2) > 0.1 or
abs(y - y2) > 0.1 or
(exLane is not None and exLane != lane2) or
(exPos is not None and abs(exPos - pos2) > 0.1) or
(exPosLat is not None and abs(exPosLat - posLat2) > 0.1)):
print(comment, ("failed: x=%s, x2=%s, y=%s, y2=%s, lane=%s, lane2=%s, pos=%s, pos2=%s " +
"posLat=%s posLat2=%s") % (x, x2, y, y2, exLane, lane2, exPos, pos2, exPosLat, posLat2))
else:
# print(comment, "success")
pass
lastVehID = ""
traciEndStep = math.ceil(traciEndTime / steplength)
while not step > traciEndStep:
print("Process %s:" % (i))
print(" stepping (step %s)..." % step)
traci.simulationStep(step * steplength)
vehs = traci.vehicle.getIDList()
if len(vehs) != 0:
vehID = vehs[0]
if vehID != lastVehID and lastVehID != "":
print(" breaking execution: traced vehicle '%s' left." % lastVehID)
break
else:
lastVehID = vehID
print(" Retrieving position for vehicle '%s' -> %s on lane '%s'" %
(vehID, traci.vehicle.getLanePosition(vehID), traci.vehicle.getLaneID(vehID)))
print(" Retrieving speed for vehicle '%s' -> %s" % (vehID, traci.vehicle.getSpeed(vehID)))
traci.vehicle.setSpeedMode(vehID, 0)
newSpeed = i * 5
print(" Setting speed for vehicle '%s' -> %s" % (vehID, newSpeed))
print(" Retrieving speed for vehicle '%s' -> %s" % (vehID, traci.vehicle.getSpeed(vehID)))
traci.vehicle.setSpeed(vehID, newSpeed)
elif lastVehID != "":
print(" breaking execution: traced vehicle '%s' left." % lastVehID)
break
step += 1
sys.stdout.flush()
endTime = traci.simulation.getTime()
traci.close()
time.sleep(orderTime * i) # assure ordering of outputs
print("Process %s (order %s) ended at step %s" % (i, index, endTime))
sys.stdout.flush()
def checkOffRoad(vehID):
print(("veh", vehID,
"speed", traci.vehicle.getSpeed(vehID),
"pos", posToString(traci.vehicle.getPosition(vehID)),
"pos3d", posToString(traci.vehicle.getPosition3D(vehID)),
"angle", traci.vehicle.getAngle(vehID),
"road", traci.vehicle.getRoadID(vehID),
"lane", traci.vehicle.getLaneID(vehID),
"lanePos", traci.vehicle.getLanePosition(vehID),
"CO2", traci.vehicle.getCO2Emission(vehID)
))
def run():
"""execute the TraCI control loop"""
step = 0
printToCParams(ToC_vehicle, False, extra=True)
t = traci.simulation.getTime()
# prevent LCs
origLCMode = traci.vehicle.getLaneChangeMode(ToC_vehicle)
traci.vehicle.setLaneChangeMode(ToC_vehicle, 768)
restoredLCMode = False
traci.simulationStep()
while traci.simulation.getMinExpectedNumber() > 0:
t = traci.simulation.getTime()
print("Time %s: Current lane of veh '%s': %s" % (t, ToC_vehicle, traci.vehicle.getLaneID(ToC_vehicle)))
printToCParams(ToC_vehicle, True)
if (not restoredLCMode and traci.vehicle.getParameter(ToC_vehicle, "device.toc.state") == "RECOVERING"):
traci.vehicle.setLaneChangeMode(ToC_vehicle, origLCMode)
# ~ restoredLCMode = True
print("Switched to manual mode: Allowing LCs!")
sys.stdout.flush()
step += 1
traci.simulationStep()
def check(mode, x, y, angle, exLane, exPos, exPosLat, comment):
traci.vehicle.moveToXY(vehID, "", 0, x, y, angle, keepRoute=mode)
traci.simulationStep()
x2, y2 = traci.vehicle.getPosition(vehID)
lane2 = traci.vehicle.getLaneID(vehID)
pos2 = traci.vehicle.getLanePosition(vehID)
posLat2 = traci.vehicle.getLateralLanePosition(vehID)
exactXY = mode & 2 != 0
if (
(exactXY and abs(x - x2) > 0.1)
or (exactXY and abs(y - y2) > 0.1)
or exLane != lane2
or (exPos is not None and abs(exPos - pos2) > 0.1)
or (exPosLat is not None and abs(exPosLat - posLat2) > 0.1)):
print(comment, ("failed: mode=%s x=%s, x2=%s, y=%s, y2=%s, lane=%s, lane2=%s, pos=%s, pos2=%s " +
"posLat=%s posLat2=%s") % (mode, x, x2, y, y2, exLane, lane2, exPos, pos2, exPosLat, posLat2))
else:
# (comment, "success")
pass
Note 2: The main complexity of this algorithm resides in the lack of TraCI functionalities in order to get
retrieve the state of a hidden lane linking two lanes. This could also be highly improved by using a
dictionary built when starting the software.
"""
mPriorityVehicles.acquire()
managedTlls = [];
length = len(priorityVehicles)
# Checking if the next traffic light has to be changed for each priority vehicleId in the simulation
for prioVehIndex in range(0, length):
vehicleId = priorityVehicles[prioVehIndex]
# Getting route and position information for the current priority vehicleId
mtraci.acquire()
route = traci.vehicle.getRoute(vehicleId)
currentLane = traci.vehicle.getLaneID(vehicleId)
mtraci.release()
if currentLane != '' and not isJunction(currentLane):
# Getting speed information for space and time predictions
mtraci.acquire()
lanePosition = traci.vehicle.getLanePosition(vehicleId)
laneLength = traci.lane.getLength(currentLane)
mtraci.release()
currentEdge = getEdgeFromLane(currentLane)
currentEdgeIndex = route.index(currentEdge)
remainingLength = laneLength - lanePosition
edge = currentEdge
lane = currentLane
edgeIndex = currentEdgeIndex
lanes = [traci.vehicle.getLaneIndex(vehicles[0])]
lane_ids = [traci.vehicle.getLaneID(vehicles[0])]
for i in range(1, len(vehicles)):
traci.vehicle.setLaneChangeMode(vehicles[i], utils.FIX_LC)
self._topology[vehicles[i]] = {"leader": vehicles[0], "front": vehicles[i - 1]}
self._states.append(self.WAITING)
utils.set_par(vehicles[i], cc.PAR_ACTIVE_CONTROLLER, cc.CACC)
min_gap = traci.vehicle.getMinGap(vehicles[i])
# The CACC controller tries to keep (desired_gap + min_gap)
# meters as intra-platoon spacing
utils.set_par(vehicles[i], cc.PAR_CACC_SPACING, desired_gap - min_gap)
traci.vehicle.setSpeedFactor(vehicles[i], 1.05)
lanes.append(traci.vehicle.getLaneIndex(vehicles[i]))
lane_ids.append(traci.vehicle.getLaneID(vehicles[i]))
self._members = vehicles
self._desired_gap = desired_gap
self._safe_gap = safe_gap
self._merging = merging
self._splitting = False
self._vehicles_splitting = [False] * len(vehicles)
leader_max_speed = traci.vehicle.getAllowedSpeed(vehicles[0])
lane = traci.vehicle.getLaneID(vehicles[0])
road_max_speed = traci.lane.getMaxSpeed(lane)
# If the platoon is fast enough merge all vehicles to the leftmost
# lane, else merge to the rightmost lane
self._desired_lane = max(lanes) if leader_max_speed > 0.9 * road_max_speed else min(lanes)
self._desired_lane_id = lane_ids[lanes.index(self._desired_lane)]
# meters as intra-platoon spacing
utils.set_par(vehicles[i], cc.PAR_CACC_SPACING, desired_gap - min_gap)
traci.vehicle.setSpeedFactor(vehicles[i], 1.05)
lanes.append(traci.vehicle.getLaneIndex(vehicles[i]))
lane_ids.append(traci.vehicle.getLaneID(vehicles[i]))
self._members = vehicles
self._desired_gap = desired_gap
self._safe_gap = safe_gap
self._merging = merging
self._splitting = False
self._vehicles_splitting = [False] * len(vehicles)
leader_max_speed = traci.vehicle.getAllowedSpeed(vehicles[0])
lane = traci.vehicle.getLaneID(vehicles[0])
road_max_speed = traci.lane.getMaxSpeed(lane)
# If the platoon is fast enough merge all vehicles to the leftmost
# lane, else merge to the rightmost lane
self._desired_lane = max(lanes) if leader_max_speed > 0.9 * road_max_speed else min(lanes)
self._desired_lane_id = lane_ids[lanes.index(self._desired_lane)]
if lane_change:
for vehicle in self._members:
if traci.vehicle.getLaneIndex != self._desired_lane:
utils.change_lane(vehicle, self._desired_lane)
def initialize_simulation(self):
if not self.initial_config:
# Get initial state of each vehicle, and store in initial config object
for veh_id in self.ids:
edge_id = traci.vehicle.getRoadID(veh_id)
route_id = traci.vehicle.getRouteID(veh_id)
lane_id = traci.vehicle.getLaneID(veh_id)
lane_index = traci.vehicle.getLaneIndex(veh_id)
lane_pos = traci.vehicle.getLanePosition(veh_id)
pos = traci.vehicle.getPosition(veh_id)
angle = traci.vehicle.getAngle(veh_id)
speed = traci.vehicle.getSpeed(veh_id)
self.initial_config[veh_id] = (edge_id, route_id, lane_id, lane_index, lane_pos, pos, angle, speed)
self.reset()
:param safe_gap: safety distance between vehicles for lane changes
:param merging: whether the new platoon is in a maneuver or not
:type vehicles: list[str]
:type desired_gap: float
:type safe_gap: float
:type merging: bool
"""
self._topology = dict()
self._states = [self.IDLE]
if len(vehicles) > 1:
traci.vehicle.setLaneChangeMode(vehicles[0], utils.FIX_LC)
utils.set_par(vehicles[0], cc.PAR_ACTIVE_CONTROLLER, cc.ACC)
traci.vehicle.setSpeedFactor(vehicles[0], 1)
lanes = [traci.vehicle.getLaneIndex(vehicles[0])]
lane_ids = [traci.vehicle.getLaneID(vehicles[0])]
for i in range(1, len(vehicles)):
traci.vehicle.setLaneChangeMode(vehicles[i], utils.FIX_LC)
self._topology[vehicles[i]] = {"leader": vehicles[0], "front": vehicles[i - 1]}
self._states.append(self.WAITING)
utils.set_par(vehicles[i], cc.PAR_ACTIVE_CONTROLLER, cc.CACC)
min_gap = traci.vehicle.getMinGap(vehicles[i])
# The CACC controller tries to keep (desired_gap + min_gap)
# meters as intra-platoon spacing
utils.set_par(vehicles[i], cc.PAR_CACC_SPACING, desired_gap - min_gap)
traci.vehicle.setSpeedFactor(vehicles[i], 1.05)
lanes.append(traci.vehicle.getLaneIndex(vehicles[i]))
lane_ids.append(traci.vehicle.getLaneID(vehicles[i]))
self._members = vehicles