Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.VAR_POSITION: lambda result: result.read("!dd"),
tc.VAR_SHAPE: Storage.readShape}
class JunctionDomain(Domain):
def __init__(self):
Domain.__init__(self, "junction", tc.CMD_GET_JUNCTION_VARIABLE, tc.CMD_SET_JUNCTION_VARIABLE,
tc.CMD_SUBSCRIBE_JUNCTION_VARIABLE, tc.RESPONSE_SUBSCRIBE_JUNCTION_VARIABLE,
tc.CMD_SUBSCRIBE_JUNCTION_CONTEXT, tc.RESPONSE_SUBSCRIBE_JUNCTION_CONTEXT,
_RETURN_VALUE_FUNC)
def getPosition(self, junctionID):
"""getPosition(string) -> (double, double)
Returns the coordinates of the center of the junction.
"""
return self._getUniversal(tc.VAR_POSITION, junctionID)
def getShape(self, junctionID):
"""getShape(string) -> list((double, double))
def __init__(self):
Domain.__init__(self, "route", tc.CMD_GET_ROUTE_VARIABLE, tc.CMD_SET_ROUTE_VARIABLE,
tc.CMD_SUBSCRIBE_ROUTE_VARIABLE, tc.RESPONSE_SUBSCRIBE_ROUTE_VARIABLE,
tc.CMD_SUBSCRIBE_ROUTE_CONTEXT, tc.RESPONSE_SUBSCRIBE_ROUTE_CONTEXT,
_RETURN_VALUE_FUNC)
(at your option) any later version.
"""
from __future__ import absolute_import
import struct
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.ID_LIST: Storage.readStringList,
tc.ID_COUNT: Storage.readInt,
tc.VAR_TYPE: Storage.readString,
tc.VAR_POSITION: lambda result: result.read("!dd"),
tc.VAR_COLOR: lambda result: result.read("!BBBB")}
class PoiDomain(Domain):
def __init__(self):
Domain.__init__(self, "poi", tc.CMD_GET_POI_VARIABLE, tc.CMD_SET_POI_VARIABLE,
tc.CMD_SUBSCRIBE_POI_VARIABLE, tc.RESPONSE_SUBSCRIBE_POI_VARIABLE,
tc.CMD_SUBSCRIBE_POI_CONTEXT, tc.RESPONSE_SUBSCRIBE_POI_CONTEXT,
_RETURN_VALUE_FUNC)
def getType(self, poiID):
"""getType(string) -> string
Returns the (abstract) type of the poi.
"""
return self._getUniversal(tc.VAR_TYPE, poiID)
def getPosition(self, poiID):
"""getPosition(string) -> (double, double)
def __init__(self):
Domain.__init__(self, "multientryexit", tc.CMD_GET_MULTIENTRYEXIT_VARIABLE, None,
tc.CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE, tc.RESPONSE_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE,
tc.CMD_SUBSCRIBE_MULTIENTRYEXIT_CONTEXT, tc.RESPONSE_SUBSCRIBE_MULTIENTRYEXIT_CONTEXT,
_RETURN_VALUE_FUNC)
def __init__(self):
Domain.__init__(self, "vehicletype", tc.CMD_GET_VEHICLETYPE_VARIABLE, tc.CMD_SET_VEHICLETYPE_VARIABLE,
tc.CMD_SUBSCRIBE_VEHICLETYPE_VARIABLE, tc.RESPONSE_SUBSCRIBE_VEHICLETYPE_VARIABLE,
tc.CMD_SUBSCRIBE_VEHICLETYPE_CONTEXT, tc.RESPONSE_SUBSCRIBE_VEHICLETYPE_CONTEXT,
_RETURN_VALUE_FUNC)
def __init__(self):
Domain.__init__(self, "inductionloop", tc.CMD_GET_INDUCTIONLOOP_VARIABLE, None,
tc.CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE, tc.RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE,
tc.CMD_SUBSCRIBE_INDUCTIONLOOP_CONTEXT, tc.RESPONSE_SUBSCRIBE_INDUCTIONLOOP_CONTEXT,
_RETURN_VALUE_FUNC)
typeID = result.readString()
data.append([vehID, length, entryTime, leaveTime, typeID])
return data
_RETURN_VALUE_FUNC = {tc.VAR_POSITION: Storage.readDouble,
tc.VAR_LANE_ID: Storage.readString,
tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList,
tc.LAST_STEP_OCCUPANCY: Storage.readDouble,
tc.LAST_STEP_LENGTH: Storage.readDouble,
tc.LAST_STEP_TIME_SINCE_DETECTION: Storage.readDouble,
tc.LAST_STEP_VEHICLE_DATA: readVehicleData}
class InductionLoopDomain(Domain):
def __init__(self):
Domain.__init__(self, "inductionloop", tc.CMD_GET_INDUCTIONLOOP_VARIABLE, None,
tc.CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE, tc.RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE,
tc.CMD_SUBSCRIBE_INDUCTIONLOOP_CONTEXT, tc.RESPONSE_SUBSCRIBE_INDUCTIONLOOP_CONTEXT,
_RETURN_VALUE_FUNC)
def getPosition(self, loopID):
"""getPosition(string) -> double
Returns the position measured from the beginning of the lane in meters.
"""
return self._getUniversal(tc.VAR_POSITION, loopID)
def getLaneID(self, loopID):
"""getLaneID(string) -> string