Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
Copyright (C) 2008-2017 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
import struct
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.ID_LIST: Storage.readStringList,
tc.ID_COUNT: Storage.readInt,
tc.VAR_TYPE: Storage.readString,
tc.VAR_POSITION: lambda result: result.read("!dd"),
tc.VAR_COLOR: lambda result: result.read("!BBBB")}
class PoiDomain(Domain):
def __init__(self):
Domain.__init__(self, "poi", tc.CMD_GET_POI_VARIABLE, tc.CMD_SET_POI_VARIABLE,
tc.CMD_SUBSCRIBE_POI_VARIABLE, tc.RESPONSE_SUBSCRIBE_POI_VARIABLE,
tc.CMD_SUBSCRIBE_POI_CONTEXT, tc.RESPONSE_SUBSCRIBE_POI_CONTEXT,
_RETURN_VALUE_FUNC)
def getType(self, poiID):
"""getType(string) -> string
# Type of Number of Controlled Links
result.read("!B")
# Number of Controlled Links
nbControlledLinks = result.read("!i")[0]
controlledLinks = []
for j in range(nbControlledLinks):
result.read("!B") # Type of Link j
link = result.readStringList() # Link j
controlledLinks.append(link)
signals.append(controlledLinks)
return signals
_RETURN_VALUE_FUNC = {tc.TL_RED_YELLOW_GREEN_STATE: Storage.readString,
tc.TL_COMPLETE_DEFINITION_RYG: _readLogics,
tc.TL_CONTROLLED_LANES: Storage.readStringList,
tc.TL_CONTROLLED_LINKS: _readLinks,
tc.TL_CURRENT_PROGRAM: Storage.readString,
tc.TL_CURRENT_PHASE: Storage.readInt,
tc.TL_NEXT_SWITCH: Storage.readInt,
tc.TL_PHASE_DURATION: Storage.readInt}
class TrafficLightsDomain(Domain):
Phase = Phase
Logic = Logic
def __init__(self):
Domain.__init__(self, "trafficlights", tc.CMD_GET_TL_VARIABLE, tc.CMD_SET_TL_VARIABLE,
tc.CMD_SUBSCRIBE_TL_VARIABLE, tc.RESPONSE_SUBSCRIBE_TL_VARIABLE,
tc.CMD_SUBSCRIBE_TL_CONTEXT, tc.RESPONSE_SUBSCRIBE_TL_CONTEXT,
Copyright (C) 2011-2017 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList,
tc.LAST_STEP_VEHICLE_HALTING_NUMBER: Storage.readInt}
class MultiEntryExitDomain(Domain):
def __init__(self):
Domain.__init__(self, "multientryexit", tc.CMD_GET_MULTIENTRYEXIT_VARIABLE, None,
tc.CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE, tc.RESPONSE_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE,
tc.CMD_SUBSCRIBE_MULTIENTRYEXIT_CONTEXT, tc.RESPONSE_SUBSCRIBE_MULTIENTRYEXIT_CONTEXT,
_RETURN_VALUE_FUNC)
def getLastStepVehicleNumber(self, detID):
"""getLastStepVehicleNumber(string) -> integer
Returns the number of vehicles that have been within the named multi-entry/multi-exit detector within the last simulation step.
"""
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
from . import constants as tc
from .domain import Domain
from .storage import Storage
_RETURN_VALUE_FUNC = {tc.JAM_LENGTH_METERS: Storage.readDouble,
tc.JAM_LENGTH_VEHICLE: Storage.readInt,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.VAR_POSITION: Storage.readDouble,
tc.VAR_LENGTH: Storage.readDouble,
tc.VAR_LANE_ID: Storage.readString,
tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList,
tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_OCCUPANCY: Storage.readDouble,
tc.LAST_STEP_VEHICLE_HALTING_NUMBER: Storage.readInt}
class LaneAreaDomain(Domain):
def __init__(self, name="lanearea", deprecatedFor=None):
Domain.__init__(self, name, tc.CMD_GET_LANEAREA_VARIABLE, None,
tc.CMD_SUBSCRIBE_LANEAREA_VARIABLE, tc.RESPONSE_SUBSCRIBE_LANEAREA_VARIABLE,
tc.CMD_SUBSCRIBE_LANEAREA_CONTEXT, tc.RESPONSE_SUBSCRIBE_LANEAREA_CONTEXT,
_RETURN_VALUE_FUNC, deprecatedFor)
def getJamLengthVehicle(self, detID):
"""getJamLengthVehicle(string) -> integer
tc.VAR_POSITION: lambda result: result.read("!dd"),
tc.VAR_ANGLE: Storage.readDouble,
tc.VAR_ROAD_ID: Storage.readString,
tc.VAR_TYPE: Storage.readString,
tc.VAR_ROUTE_ID: Storage.readString,
tc.VAR_COLOR: lambda result: result.read("!BBBB"),
tc.VAR_LANEPOSITION: Storage.readDouble,
tc.VAR_LENGTH: Storage.readDouble,
tc.VAR_WAITING_TIME: Storage.readDouble,
tc.VAR_WIDTH: Storage.readDouble,
tc.VAR_MINGAP: Storage.readDouble,
tc.VAR_NEXT_EDGE: Storage.readString,
tc.VAR_STAGE: Storage.readInt,
tc.VAR_STAGES_REMAINING: Storage.readInt,
tc.VAR_VEHICLE: Storage.readString,
tc.VAR_EDGES: Storage.readStringList,
}
class PersonDomain(Domain):
DEPART_NOW = -3
def __init__(self):
Domain.__init__(self, "person", tc.CMD_GET_PERSON_VARIABLE, tc.CMD_SET_PERSON_VARIABLE,
tc.CMD_SUBSCRIBE_PERSON_VARIABLE, tc.RESPONSE_SUBSCRIBE_PERSON_VARIABLE,
tc.CMD_SUBSCRIBE_PERSON_CONTEXT, tc.RESPONSE_SUBSCRIBE_PERSON_CONTEXT,
_RETURN_VALUE_FUNC)
def getSpeed(self, personID):
"""getSpeed(string) -> double
Returns the speed in m/s of the named person within the last step.
"""
from __future__ import absolute_import
from .domain import Domain
from .storage import Storage
import struct
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.VAR_TIME_STEP: Storage.readInt,
tc.VAR_LOADED_VEHICLES_NUMBER: Storage.readInt,
tc.VAR_LOADED_VEHICLES_IDS: Storage.readStringList,
tc.VAR_DEPARTED_VEHICLES_NUMBER: Storage.readInt,
tc.VAR_DEPARTED_VEHICLES_IDS: Storage.readStringList,
tc.VAR_ARRIVED_VEHICLES_NUMBER: Storage.readInt,
tc.VAR_ARRIVED_VEHICLES_IDS: Storage.readStringList,
tc.VAR_PARKING_STARTING_VEHICLES_NUMBER: Storage.readInt,
tc.VAR_PARKING_STARTING_VEHICLES_IDS: Storage.readStringList,
tc.VAR_PARKING_ENDING_VEHICLES_NUMBER: Storage.readInt,
tc.VAR_PARKING_ENDING_VEHICLES_IDS: Storage.readStringList,
tc.VAR_STOP_STARTING_VEHICLES_NUMBER: Storage.readInt,
tc.VAR_STOP_STARTING_VEHICLES_IDS: Storage.readStringList,
tc.VAR_STOP_ENDING_VEHICLES_NUMBER: Storage.readInt,
tc.VAR_STOP_ENDING_VEHICLES_IDS: Storage.readStringList,
tc.VAR_MIN_EXPECTED_VEHICLES: Storage.readInt,
tc.VAR_BUS_STOP_WAITING: Storage.readInt,
tc.VAR_TELEPORT_STARTING_VEHICLES_NUMBER: Storage.readInt,
tc.VAR_TELEPORT_STARTING_VEHICLES_IDS: Storage.readStringList,
tc.VAR_TELEPORT_ENDING_VEHICLES_NUMBER: Storage.readInt,
tc.VAR_TELEPORT_ENDING_VEHICLES_IDS: Storage.readStringList,
tc.VAR_DELTA_T: Storage.readInt,
tc.VAR_NET_BOUNDING_BOX: lambda result: (result.read("!dd"), result.read("!dd"))}
def __init__(self, name, cmdGetID, cmdSetID,
subscribeID, subscribeResponseID,
contextID, contextResponseID,
retValFunc, deprecatedFor=None):
self._name = name
self._cmdGetID = cmdGetID
self._cmdSetID = cmdSetID
self._subscribeID = subscribeID
self._subscribeResponseID = subscribeResponseID
self._contextID = contextID
self._contextResponseID = contextResponseID
self._retValFunc = {tc.ID_LIST: Storage.readStringList,
tc.ID_COUNT: Storage.readInt}
self._retValFunc.update(retValFunc)
self._deprecatedFor = deprecatedFor
self._connection = None
_defaultDomains.append(self)
setattr(traci, name, self)
tc.VAR_CO2EMISSION: Storage.readDouble,
tc.VAR_COEMISSION: Storage.readDouble,
tc.VAR_HCEMISSION: Storage.readDouble,
tc.VAR_PMXEMISSION: Storage.readDouble,
tc.VAR_NOXEMISSION: Storage.readDouble,
tc.VAR_FUELCONSUMPTION: Storage.readDouble,
tc.VAR_NOISEEMISSION: Storage.readDouble,
tc.VAR_ELECTRICITYCONSUMPTION: Storage.readDouble,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.LAST_STEP_OCCUPANCY: Storage.readDouble,
tc.LAST_STEP_LENGTH: Storage.readDouble,
tc.VAR_CURRENT_TRAVELTIME: Storage.readDouble,
tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_VEHICLE_HALTING_NUMBER: Storage.readInt,
tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList,
tc.LAST_STEP_PERSON_ID_LIST: Storage.readStringList,
}
class EdgeDomain(Domain):
def __init__(self):
Domain.__init__(self, "edge", tc.CMD_GET_EDGE_VARIABLE, tc.CMD_SET_EDGE_VARIABLE,
tc.CMD_SUBSCRIBE_EDGE_VARIABLE, tc.RESPONSE_SUBSCRIBE_EDGE_VARIABLE,
tc.CMD_SUBSCRIBE_EDGE_CONTEXT, tc.RESPONSE_SUBSCRIBE_EDGE_CONTEXT,
_RETURN_VALUE_FUNC)
def getAdaptedTraveltime(self, edgeID, time):
"""getAdaptedTraveltime(string, double) -> double
Returns the travel time value (in s) used for (re-)routing
which is valid on the edge at the given time.
SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
Copyright (C) 2008-2017 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
from . import constants as tc
from .domain import Domain
from .storage import Storage
_RETURN_VALUE_FUNC = {tc.VAR_EDGES: Storage.readStringList}
class RouteDomain(Domain):
def __init__(self):
Domain.__init__(self, "route", tc.CMD_GET_ROUTE_VARIABLE, tc.CMD_SET_ROUTE_VARIABLE,
tc.CMD_SUBSCRIBE_ROUTE_VARIABLE, tc.RESPONSE_SUBSCRIBE_ROUTE_VARIABLE,
tc.CMD_SUBSCRIBE_ROUTE_CONTEXT, tc.RESPONSE_SUBSCRIBE_ROUTE_CONTEXT,
_RETURN_VALUE_FUNC)
def getEdges(self, routeID):
"""getEdges(string) -> list(string)
Returns a list of all edges in the route.
"""
return self._getUniversal(tc.VAR_EDGES, routeID)
tc.VAR_CO2EMISSION: Storage.readDouble,
tc.VAR_COEMISSION: Storage.readDouble,
tc.VAR_HCEMISSION: Storage.readDouble,
tc.VAR_PMXEMISSION: Storage.readDouble,
tc.VAR_NOXEMISSION: Storage.readDouble,
tc.VAR_FUELCONSUMPTION: Storage.readDouble,
tc.VAR_NOISEEMISSION: Storage.readDouble,
tc.VAR_ELECTRICITYCONSUMPTION: Storage.readDouble,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.LAST_STEP_OCCUPANCY: Storage.readDouble,
tc.LAST_STEP_LENGTH: Storage.readDouble,
tc.VAR_WAITING_TIME: Storage.readDouble,
tc.VAR_CURRENT_TRAVELTIME: Storage.readDouble,
tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_VEHICLE_HALTING_NUMBER: Storage.readInt,
tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList}
class LaneDomain(Domain):
def __init__(self):
Domain.__init__(self, "lane", tc.CMD_GET_LANE_VARIABLE, tc.CMD_SET_LANE_VARIABLE,
tc.CMD_SUBSCRIBE_LANE_VARIABLE, tc.RESPONSE_SUBSCRIBE_LANE_VARIABLE,
tc.CMD_SUBSCRIBE_LANE_CONTEXT, tc.RESPONSE_SUBSCRIBE_LANE_CONTEXT,
_RETURN_VALUE_FUNC)
def getLength(self, laneID):
"""getLength(string) -> double
Returns the length in m.
"""
return self._getUniversal(tc.VAR_LENGTH, laneID)