Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def getPosition(self, junctionID):
"""getPosition(string) -> (double, double)
Returns the coordinates of the center of the junction.
"""
return self._getUniversal(tc.VAR_POSITION, junctionID)
def getPosition(self, vehID):
"""getPosition(string) -> (double, double)
Returns the position of the named vehicle within the last step [m,m].
"""
return self._getUniversal(tc.VAR_POSITION, vehID)
def setPosition(self, poiID, x, y):
"""setPosition(string, (double, double)) -> None
Sets the position coordinates of the poi.
"""
self._connection._beginMessage(
tc.CMD_SET_POI_VARIABLE, tc.VAR_POSITION, poiID, 1 + 8 + 8)
self._connection._string += struct.pack("!Bdd", tc.POSITION_2D, x, y)
self._connection._sendExact()
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
import struct
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.ID_LIST: Storage.readStringList,
tc.ID_COUNT: Storage.readInt,
tc.VAR_SPEED: Storage.readDouble,
tc.VAR_POSITION: lambda result: result.read("!dd"),
tc.VAR_ANGLE: Storage.readDouble,
tc.VAR_ROAD_ID: Storage.readString,
tc.VAR_TYPE: Storage.readString,
tc.VAR_ROUTE_ID: Storage.readString,
tc.VAR_COLOR: lambda result: result.read("!BBBB"),
tc.VAR_LANEPOSITION: Storage.readDouble,
tc.VAR_LENGTH: Storage.readDouble,
tc.VAR_WAITING_TIME: Storage.readDouble,
tc.VAR_WIDTH: Storage.readDouble,
tc.VAR_MINGAP: Storage.readDouble,
tc.VAR_NEXT_EDGE: Storage.readString,
tc.VAR_STAGE: Storage.readInt,
tc.VAR_STAGES_REMAINING: Storage.readInt,
tc.VAR_VEHICLE: Storage.readString,
tc.VAR_EDGES: Storage.readStringList,
}
def getPosition(self, loopID):
"""getPosition(string) -> double
Returns the position measured from the beginning of the lane in meters.
"""
return self._getUniversal(tc.VAR_POSITION, loopID)
def _readNextTLS(result):
result.read("!iB") # numCompounds, TYPE_INT
numTLS = result.read("!i")[0]
nextTLS = []
for i in range(numTLS):
result.read("!B")
tlsID = result.readString()
tlsIndex, dist, state = result.read("!BiBdBB")[1::2]
nextTLS.append((tlsID, tlsIndex, dist, chr(state)))
return nextTLS
_RETURN_VALUE_FUNC = {tc.VAR_SPEED: Storage.readDouble,
tc.VAR_SPEED_WITHOUT_TRACI: Storage.readDouble,
tc.VAR_POSITION: lambda result: result.read("!dd"),
tc.VAR_POSITION3D: lambda result: result.read("!ddd"),
tc.VAR_ANGLE: Storage.readDouble,
tc.VAR_ROAD_ID: Storage.readString,
tc.VAR_LANE_ID: Storage.readString,
tc.VAR_LANE_INDEX: Storage.readInt,
tc.VAR_TYPE: Storage.readString,
tc.VAR_ROUTE_ID: Storage.readString,
tc.VAR_ROUTE_INDEX: Storage.readInt,
tc.VAR_COLOR: lambda result: result.read("!BBBB"),
tc.VAR_LANEPOSITION: Storage.readDouble,
tc.VAR_CO2EMISSION: Storage.readDouble,
tc.VAR_COEMISSION: Storage.readDouble,
tc.VAR_HCEMISSION: Storage.readDouble,
tc.VAR_PMXEMISSION: Storage.readDouble,
tc.VAR_NOXEMISSION: Storage.readDouble,
tc.VAR_FUELCONSUMPTION: Storage.readDouble,
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
import struct
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.ID_LIST: Storage.readStringList,
tc.ID_COUNT: Storage.readInt,
tc.VAR_TYPE: Storage.readString,
tc.VAR_POSITION: lambda result: result.read("!dd"),
tc.VAR_COLOR: lambda result: result.read("!BBBB")}
class PoiDomain(Domain):
def __init__(self):
Domain.__init__(self, "poi", tc.CMD_GET_POI_VARIABLE, tc.CMD_SET_POI_VARIABLE,
tc.CMD_SUBSCRIBE_POI_VARIABLE, tc.RESPONSE_SUBSCRIBE_POI_VARIABLE,
tc.CMD_SUBSCRIBE_POI_CONTEXT, tc.RESPONSE_SUBSCRIBE_POI_CONTEXT,
_RETURN_VALUE_FUNC)
def getType(self, poiID):
"""getType(string) -> string
Returns the (abstract) type of the poi.
"""
def getPosition(self, detID):
"""getPosition(string) -> double
Returns the starting position of the detector measured from the beginning of the lane in meters.
"""
return self._getUniversal(tc.VAR_POSITION, detID)
SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
Copyright (C) 2011-2017 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.VAR_POSITION: lambda result: result.read("!dd"),
tc.VAR_SHAPE: Storage.readShape}
class JunctionDomain(Domain):
def __init__(self):
Domain.__init__(self, "junction", tc.CMD_GET_JUNCTION_VARIABLE, tc.CMD_SET_JUNCTION_VARIABLE,
tc.CMD_SUBSCRIBE_JUNCTION_VARIABLE, tc.RESPONSE_SUBSCRIBE_JUNCTION_VARIABLE,
tc.CMD_SUBSCRIBE_JUNCTION_CONTEXT, tc.RESPONSE_SUBSCRIBE_JUNCTION_CONTEXT,
_RETURN_VALUE_FUNC)
def getPosition(self, junctionID):
"""getPosition(string) -> (double, double)
Returns the coordinates of the center of the junction.
"""
data = []
for i in range(nbData):
result.read("!B")
vehID = result.readString()
result.read("!B")
length = result.readDouble()
result.read("!B")
entryTime = result.readDouble()
result.read("!B")
leaveTime = result.readDouble()
result.read("!B")
typeID = result.readString()
data.append([vehID, length, entryTime, leaveTime, typeID])
return data
_RETURN_VALUE_FUNC = {tc.VAR_POSITION: Storage.readDouble,
tc.VAR_LANE_ID: Storage.readString,
tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList,
tc.LAST_STEP_OCCUPANCY: Storage.readDouble,
tc.LAST_STEP_LENGTH: Storage.readDouble,
tc.LAST_STEP_TIME_SINCE_DETECTION: Storage.readDouble,
tc.LAST_STEP_VEHICLE_DATA: readVehicleData}
class InductionLoopDomain(Domain):
def __init__(self):
Domain.__init__(self, "inductionloop", tc.CMD_GET_INDUCTIONLOOP_VARIABLE, None,
tc.CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE, tc.RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE,
tc.CMD_SUBSCRIBE_INDUCTIONLOOP_CONTEXT, tc.RESPONSE_SUBSCRIBE_INDUCTIONLOOP_CONTEXT,