Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
import struct
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.VAR_VIEW_ZOOM: Storage.readDouble,
tc.VAR_VIEW_OFFSET: lambda result: result.read("!dd"),
tc.VAR_VIEW_SCHEMA: Storage.readString,
tc.VAR_VIEW_BOUNDARY: lambda result: (result.read("!dd"), result.read("!dd"))}
class GuiDomain(Domain):
DEFAULT_VIEW = 'View #0'
def __init__(self):
Domain.__init__(self, "gui", tc.CMD_GET_GUI_VARIABLE, tc.CMD_SET_GUI_VARIABLE,
tc.CMD_SUBSCRIBE_GUI_VARIABLE, tc.RESPONSE_SUBSCRIBE_GUI_VARIABLE,
tc.CMD_SUBSCRIBE_GUI_CONTEXT, tc.RESPONSE_SUBSCRIBE_GUI_CONTEXT,
_RETURN_VALUE_FUNC)
def getZoom(self, viewID=DEFAULT_VIEW):
"""getZoom(string): -> double
Returns the current zoom factor.
Copyright (C) 2011-2017 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
import struct
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.ID_LIST: Storage.readStringList,
tc.ID_COUNT: Storage.readInt,
tc.VAR_SPEED: Storage.readDouble,
tc.VAR_POSITION: lambda result: result.read("!dd"),
tc.VAR_ANGLE: Storage.readDouble,
tc.VAR_ROAD_ID: Storage.readString,
tc.VAR_TYPE: Storage.readString,
tc.VAR_ROUTE_ID: Storage.readString,
tc.VAR_COLOR: lambda result: result.read("!BBBB"),
tc.VAR_LANEPOSITION: Storage.readDouble,
tc.VAR_LENGTH: Storage.readDouble,
tc.VAR_WAITING_TIME: Storage.readDouble,
tc.VAR_WIDTH: Storage.readDouble,
tc.VAR_MINGAP: Storage.readDouble,
tc.VAR_NEXT_EDGE: Storage.readString,
tc.VAR_STAGE: Storage.readInt,
tc.VAR_STAGES_REMAINING: Storage.readInt,
_RETURN_VALUE_FUNC = {tc.VAR_EDGE_TRAVELTIME: Storage.readDouble,
tc.VAR_WAITING_TIME: Storage.readDouble,
tc.VAR_EDGE_EFFORT: Storage.readDouble,
tc.VAR_CO2EMISSION: Storage.readDouble,
tc.VAR_COEMISSION: Storage.readDouble,
tc.VAR_HCEMISSION: Storage.readDouble,
tc.VAR_PMXEMISSION: Storage.readDouble,
tc.VAR_NOXEMISSION: Storage.readDouble,
tc.VAR_FUELCONSUMPTION: Storage.readDouble,
tc.VAR_NOISEEMISSION: Storage.readDouble,
tc.VAR_ELECTRICITYCONSUMPTION: Storage.readDouble,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.LAST_STEP_OCCUPANCY: Storage.readDouble,
tc.LAST_STEP_LENGTH: Storage.readDouble,
tc.VAR_CURRENT_TRAVELTIME: Storage.readDouble,
tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_VEHICLE_HALTING_NUMBER: Storage.readInt,
tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList,
tc.LAST_STEP_PERSON_ID_LIST: Storage.readStringList,
}
class EdgeDomain(Domain):
def __init__(self):
Domain.__init__(self, "edge", tc.CMD_GET_EDGE_VARIABLE, tc.CMD_SET_EDGE_VARIABLE,
tc.CMD_SUBSCRIBE_EDGE_VARIABLE, tc.RESPONSE_SUBSCRIBE_EDGE_VARIABLE,
tc.CMD_SUBSCRIBE_EDGE_CONTEXT, tc.RESPONSE_SUBSCRIBE_EDGE_CONTEXT,
_RETURN_VALUE_FUNC)
SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
Copyright (C) 2011-2017 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
from . import constants as tc
from .domain import Domain
from .storage import Storage
_RETURN_VALUE_FUNC = {tc.JAM_LENGTH_METERS: Storage.readDouble,
tc.JAM_LENGTH_VEHICLE: Storage.readInt,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.VAR_POSITION: Storage.readDouble,
tc.VAR_LENGTH: Storage.readDouble,
tc.VAR_LANE_ID: Storage.readString,
tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList,
tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_OCCUPANCY: Storage.readDouble,
tc.LAST_STEP_VEHICLE_HALTING_NUMBER: Storage.readInt}
class LaneAreaDomain(Domain):
def __init__(self, name="lanearea", deprecatedFor=None):
Domain.__init__(self, name, tc.CMD_GET_LANEAREA_VARIABLE, None,
tc.CMD_SUBSCRIBE_LANEAREA_VARIABLE, tc.RESPONSE_SUBSCRIBE_LANEAREA_VARIABLE,
for j in range(nbControlledLinks):
result.read("!B") # Type of Link j
link = result.readStringList() # Link j
controlledLinks.append(link)
signals.append(controlledLinks)
return signals
_RETURN_VALUE_FUNC = {tc.TL_RED_YELLOW_GREEN_STATE: Storage.readString,
tc.TL_COMPLETE_DEFINITION_RYG: _readLogics,
tc.TL_CONTROLLED_LANES: Storage.readStringList,
tc.TL_CONTROLLED_LINKS: _readLinks,
tc.TL_CURRENT_PROGRAM: Storage.readString,
tc.TL_CURRENT_PHASE: Storage.readInt,
tc.TL_NEXT_SWITCH: Storage.readInt,
tc.TL_PHASE_DURATION: Storage.readInt}
class TrafficLightsDomain(Domain):
Phase = Phase
Logic = Logic
def __init__(self):
Domain.__init__(self, "trafficlights", tc.CMD_GET_TL_VARIABLE, tc.CMD_SET_TL_VARIABLE,
tc.CMD_SUBSCRIBE_TL_VARIABLE, tc.RESPONSE_SUBSCRIBE_TL_VARIABLE,
tc.CMD_SUBSCRIBE_TL_CONTEXT, tc.RESPONSE_SUBSCRIBE_TL_CONTEXT,
_RETURN_VALUE_FUNC)
def getRedYellowGreenState(self, tlsID):
"""getRedYellowGreenState(string) -> string
SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
Copyright (C) 2011-2017 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList,
tc.LAST_STEP_VEHICLE_HALTING_NUMBER: Storage.readInt}
class MultiEntryExitDomain(Domain):
def __init__(self):
Domain.__init__(self, "multientryexit", tc.CMD_GET_MULTIENTRYEXIT_VARIABLE, None,
tc.CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE, tc.RESPONSE_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE,
tc.CMD_SUBSCRIBE_MULTIENTRYEXIT_CONTEXT, tc.RESPONSE_SUBSCRIBE_MULTIENTRYEXIT_CONTEXT,
_RETURN_VALUE_FUNC)
def getLastStepVehicleNumber(self, detID):
"""getLastStepVehicleNumber(string) -> integer
def __init__(self, name, cmdGetID, cmdSetID,
subscribeID, subscribeResponseID,
contextID, contextResponseID,
retValFunc, deprecatedFor=None):
self._name = name
self._cmdGetID = cmdGetID
self._cmdSetID = cmdSetID
self._subscribeID = subscribeID
self._subscribeResponseID = subscribeResponseID
self._contextID = contextID
self._contextResponseID = contextResponseID
self._retValFunc = {tc.ID_LIST: Storage.readStringList,
tc.ID_COUNT: Storage.readInt}
self._retValFunc.update(retValFunc)
self._deprecatedFor = deprecatedFor
self._connection = None
_defaultDomains.append(self)
setattr(traci, name, self)
def _recvExact(self):
try:
result = bytes()
while len(result) < 4:
t = self._socket.recv(4 - len(result))
if not t:
return None
result += t
length = struct.unpack("!i", result)[0] - 4
result = bytes()
while len(result) < length:
t = self._socket.recv(length - len(result))
if not t:
return None
result += t
return Storage(result)
except socket.error:
return None
def _sendExact(self):
if _embedded:
result = Storage(traciemb.execute(self._string))
else:
length = struct.pack("!i", len(self._string) + 4)
self._socket.send(length + self._string)
result = self._recvExact()
if not result:
self._socket.close()
del self._socket
raise FatalTraCIError("connection closed by SUMO")
for command in self._queue:
prefix = result.read("!BBB")
err = result.readString()
if prefix[2] or err:
self._string = bytes()
self._queue = []
raise TraCIException(prefix[1], _RESULTS[prefix[2]], err)
elif prefix[1] != command: