Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
traci.simulationStep()
if subscribed:
# check if objID has arrived at destination
arrivedList = traci.simulation.getArrivedIDList()
if objID in arrivedList:
print("[%03d] Vehicle '%s' has arrived at destination" % (step, objID))
break
print("[%03d] Context results for vehicle '%s':" % (step, objID))
for v in sorted(traci.vehicle.getContextSubscriptionResults(objID) or []):
print(v)
if not subscribed:
print("Subscribing to vehicle context of object '%s'" % (objID))
traci.vehicle.subscribeContext(objID, traci.constants.CMD_GET_VEHICLE_VARIABLE,
viewRange, [traci.constants.VAR_POSITION])
sys.stdout.flush()
laneList = map(int, sys.argv[3].strip('[]').split(','))
traci.vehicle.addSubscriptionFilterLanes(laneList)
traci.vehicle.addSubscriptionFilterUpstreamDistance(float(sys.argv[4]))
traci.vehicle.addSubscriptionFilterDownstreamDistance(float(sys.argv[5]))
# advice all vehicle not to change lanes
for vehID in traci.vehicle.getIDList():
traci.vehicle.changeLane(vehID, traci.vehicle.getLaneIndex(vehID), 111)
subscribed = True
step += 1
step = 0
traci.start(sumoCall + ["-n", "input_net.net.xml", "-r", "input_routes.rou.xml", "--no-step-log", "true"])
subscribed = False
while not step > traciEndTime:
responses = traci.simulationStep()
near1 = set()
if subscribed:
print("Context results for veh '%s':" % egoID)
for v in sorted(traci.vehicle.getContextSubscriptionResults(egoID) or []):
print(v)
near1.add(v)
if not subscribed:
print("Subscribing to context of vehicle '%s' (range=%s)" % (egoID, range))
traci.vehicle.subscribeContext(egoID, traci.constants.CMD_GET_VEHICLE_VARIABLE,
range, [traci.constants.VAR_POSITION])
print("Adding field of vision subscription filter ... (openingAngle=%s)" % (openingAngle))
sys.stdout.flush()
if testWithIncompatibleFilter:
traci.vehicle.addSubscriptionFilterLanes([0])
traci.vehicle.addSubscriptionFilterFieldOfVision(openingAngle)
subscribed = True
step += 1
traci.vehicle.unsubscribeContext(egoID, traci.constants.CMD_GET_VEHICLE_VARIABLE, 0.0)
responses = traci.simulationStep()
if responses:
print("Error: Unsubscribe did not work")
else:
print("Ok: Unsubscribe successful")
print("Print ended at step %s" % traci.simulation.getTime())
traci.close()
import sumolib # noqa
WATCH = False
sumoBinary = 'sumo-gui' if WATCH else os.environ["SUMO_BINARY"]
cmd = [
sumoBinary,
'-n', 'input_net.net.xml',
'--no-step-log', ]
if not WATCH:
cmd += ['-S', '-Q']
traci.start(cmd)
ANGLE_UNDEF = traci.constants.INVALID_DOUBLE_VALUE
INVALID = traci.constants.INVALID_DOUBLE_VALUE
vehID = "v0"
def check(x, y, angle, exLane, exPos, exPosLat, comment):
traci.vehicle.moveToXY(vehID, "", 0, x, y, angle, keepRoute=2)
traci.simulationStep()
x2, y2 = traci.vehicle.getPosition(vehID)
lane2 = traci.vehicle.getLaneID(vehID)
pos2 = traci.vehicle.getLanePosition(vehID)
posLat2 = traci.vehicle.getLateralLanePosition(vehID)
if (abs(x - x2) > 0.1 or
abs(y - y2) > 0.1 or
(exLane is not None and exLane != lane2) or
(exPos is not None and abs(exPos - pos2) > 0.1) or
(exPosLat is not None and abs(exPosLat - posLat2) > 0.1)):
traci.vehicle.setEmissionClass(electricVeh, "Energy/unknown")
try:
print(traci.vehicle.getParameter(electricVeh, "device.foo.bar"))
except traci.TraCIException as e:
if traci.isLibsumo():
print(e, file=sys.stderr)
print("recovering from exception after asking for unknown device")
try:
print(traci.vehicle.getParameter(electricVeh, "device.battery.foobar"))
except traci.TraCIException as e:
if traci.isLibsumo():
print(e, file=sys.stderr)
print("recovering from exception after asking for unknown device parameter")
traci.vehicle.subscribe(electricVeh, [tc.VAR_POSITION, tc.VAR_POSITION3D])
for i in range(10):
step()
print(('%s speed="%s" consumed="%s" charged="%s" cap="%s" maxCap="%s" station="%s" mass=%s emissionClass=%s ' +
'electricityConsumption=%s') % (
electricVeh,
traci.vehicle.getSpeed(electricVeh),
traci.vehicle.getParameter(electricVeh, "device.battery.energyConsumed"),
traci.vehicle.getParameter(electricVeh, "device.battery.energyCharged"),
traci.vehicle.getParameter(electricVeh, "device.battery.actualBatteryCapacity"),
traci.vehicle.getParameter(electricVeh, "device.battery.maximumBatteryCapacity"),
traci.vehicle.getParameter(electricVeh, "device.battery.chargingStationId"),
traci.vehicle.getParameter(electricVeh, "device.battery.vehicleMass"),
traci.vehicle.getEmissionClass(electricVeh),
traci.vehicle.getElectricityConsumption(electricVeh),
))
print(sorted(traci.vehicle.getSubscriptionResults(electricVeh).items()))
(downstreamDist=%s, upstreamDist=%s, lanes=%s, opposite=%s
vTypes:%s, vClasses:%s)""" % (downstreamDist, upstreamDist, lanes, opposite, vTypes, vClasses))
sys.stdout.flush()
traci.vehicle.addSubscriptionFilterDownstreamDistance(downstreamDist)
traci.vehicle.addSubscriptionFilterUpstreamDistance(upstreamDist)
traci.vehicle.addSubscriptionFilterLanes(lanes)
if vClasses:
traci.vehicle.addSubscriptionFilterVClass(vClasses)
if vTypes:
traci.vehicle.addSubscriptionFilterVType(vTypes)
if not opposite:
traci.vehicle.addSubscriptionFilterNoOpposite()
subscribed = True
step += 1
traci.vehicle.unsubscribeContext(egoID, traci.constants.CMD_GET_VEHICLE_VARIABLE, 0.0)
responses = traci.simulationStep()
if responses:
print("Error: Unsubscribe did not work")
else:
print("Ok: Unsubscribe successful")
print("Print ended at step %s" % traci.simulation.getTime())
traci.close()
sys.stdout.flush()
for v in pos:
if egoPos:
if math.sqrt(dist2(egoPos, pos[v])) < viewRange:
near2.add(v)
if shape:
lastP = shape[0]
for p in shape[1:]:
if math.sqrt(distToSegmentSquared(pos[v], lastP, p)) < viewRange:
near2.add(v)
lastP = p
if not subscribed:
module.subscribeContext(objID, traci.constants.CMD_GET_VEHICLE_VARIABLE, viewRange, [
traci.constants.VAR_POSITION])
module.subscribeContext(objID, traci.constants.CMD_GET_PERSON_VARIABLE, viewRange, [
traci.constants.VAR_POSITION])
subscribed = True
else:
seen1 += len(near1)
seen2 += len(near2)
for v in near1:
if v not in near2:
print("timestep %s: %s is missing in surrounding objects" % (step, v))
for v in near2:
if v not in near1:
print("timestep %s: %s is missing in subscription results" % (step, v))
step += 1
module.unsubscribeContext(
objID, traci.constants.CMD_GET_VEHICLE_VARIABLE, viewRange)
responses = traci.simulationStep()
if responses:
def add(routeID, edges):
"""add(string, list(string)) -> None
Adds a new route with the given id consisting of the given list of edge IDs.
"""
traci._beginMessage(tc.CMD_SET_ROUTE_VARIABLE, tc.ADD, routeID,
1 + 4 + sum(map(len, edges)) + 4 * len(edges))
traci._message.packStringList(edges)
traci._sendExact()
def _getUniversal(varID, poiID):
result = traci._sendReadOneStringCmd(tc.CMD_GET_POI_VARIABLE, varID, poiID)
return _RETURN_VALUE_FUNC[varID](result)
- Modifies the state of all vehicle to match their state at the current
time step.
- Introduces newly departed vehicles and remove vehicles that exited
the network.
Parameters
----------
vehicle_obs: dict
vehicle observations provided from sumo via subscriptions
sim_obs: dict
simulation observations provided from sumo via subscriptions
env: Environment type
state of the environment at the current time step
"""
# remove exiting vehicles from the vehicles class
for veh_id in sim_obs[tc.VAR_ARRIVED_VEHICLES_IDS]:
if veh_id not in sim_obs[tc.VAR_TELEPORT_STARTING_VEHICLES_IDS]:
self.remove(veh_id)
else:
# this is meant to resolve the KeyError bug when there are
# collisions
vehicle_obs[veh_id] = self.__sumo_observations[veh_id]
# add entering vehicles into the vehicles class
for veh_id in sim_obs[tc.VAR_DEPARTED_VEHICLES_IDS]:
veh_type = env.traci_connection.vehicle.getTypeID(veh_id)
self.add_departed(veh_id, veh_type, env)
if env.time_counter == 0:
# if the time_counter is 0, this we need to reset all necessary
# values
for veh_id in self.__ids:
def _addVehicle(self, vehID):
'''_addVehicle(string)
Creates a new PVehicle object and registers is soliton platoon
'''
try:
traci.vehicle.subscribe(vehID, (tc.VAR_ROAD_ID, tc.VAR_LANE_INDEX, tc.VAR_LANE_ID, tc.VAR_SPEED))
veh = simpla._pvehicle.PVehicle(vehID, self._controlInterval)
except TraCIException:
warn("Tried to create non-existing vehicle '%s'" % vehID)
return
except KeyError as e:
raise e
if cfg.VERBOSITY >= 2:
report("Adding vehicle '%s'" % vehID)
self._connectedVehicles[vehID] = veh
self._platoons[veh.getPlatoon().getID()] = veh.getPlatoon()