Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def getElectricityConsumption(self, laneID):
"""getElectricityConsumption(string) -> double
Returns the electricity consumption in ml for the last time step.
"""
return self._getUniversal(tc.VAR_ELECTRICITYCONSUMPTION, laneID)
def setColor(self, vehID, color):
"""setColor(string, (integer, integer, integer, integer))
sets color for vehicle with the given ID.
i.e. (255,0,0,0) for the color red.
The fourth integer (alpha) is only used when drawing vehicles with raster images
"""
self._connection._beginMessage(
tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_COLOR, vehID, 1 + 1 + 1 + 1 + 1)
self._connection._string += struct.pack("!BBBBB", tc.TYPE_COLOR, int(
color[0]), int(color[1]), int(color[2]), int(color[3]))
self._connection._sendExact()
def setStop(self, vehID, edgeID, pos=1., laneIndex=0, duration=2**31 - 1,
flags=tc.STOP_DEFAULT, startPos=tc.INVALID_DOUBLE_VALUE, until=-1):
"""setStop(string, string, double, integer, integer, integer, double, integer) -> None
Adds or modifies a stop with the given parameters. The duration and the until attribute are
in milliseconds.
"""
self._connection._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE, tc.CMD_STOP,
vehID, 1 + 4 + 1 + 4 + len(edgeID) + 1 + 8 + 1 + 1 + 1 + 4 + 1 + 1 + 1 + 8 + 1 + 4)
self._connection._string += struct.pack("!Bi", tc.TYPE_COMPOUND, 7)
self._connection._packString(edgeID)
self._connection._string += struct.pack("!BdBBBiBB", tc.TYPE_DOUBLE, pos,
tc.TYPE_BYTE, laneIndex, tc.TYPE_INTEGER, duration, tc.TYPE_BYTE, flags)
self._connection._string += struct.pack("!BdBi",
tc.TYPE_DOUBLE, startPos, tc.TYPE_INTEGER, until)
self._connection._sendExact()
(at your option) any later version.
"""
from __future__ import absolute_import
import struct
from . import constants as tc
from .domain import Domain
from .storage import Storage
from .exceptions import *
def _TIME2STEPS(time):
"""Conversion from (float) time in seconds to milliseconds as int"""
return int(time * 1000)
_RETURN_VALUE_FUNC = {tc.VAR_EDGE_TRAVELTIME: Storage.readDouble,
tc.VAR_WAITING_TIME: Storage.readDouble,
tc.VAR_EDGE_EFFORT: Storage.readDouble,
tc.VAR_CO2EMISSION: Storage.readDouble,
tc.VAR_COEMISSION: Storage.readDouble,
tc.VAR_HCEMISSION: Storage.readDouble,
tc.VAR_PMXEMISSION: Storage.readDouble,
tc.VAR_NOXEMISSION: Storage.readDouble,
tc.VAR_FUELCONSUMPTION: Storage.readDouble,
tc.VAR_NOISEEMISSION: Storage.readDouble,
tc.VAR_ELECTRICITYCONSUMPTION: Storage.readDouble,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.LAST_STEP_OCCUPANCY: Storage.readDouble,
tc.LAST_STEP_LENGTH: Storage.readDouble,
tc.VAR_CURRENT_TRAVELTIME: Storage.readDouble,
tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_VEHICLE_HALTING_NUMBER: Storage.readInt,
def getLaneChangeState(self, vehID, direction):
"""getLaneChangeState(string, int) -> (int, int)
Return the lane change state for the vehicle
"""
self._connection._beginMessage(
tc.CMD_GET_VEHICLE_VARIABLE, tc.CMD_CHANGELANE, vehID, 1 + 4)
self._connection._string += struct.pack("!Bi", tc.TYPE_INTEGER, direction)
result = self._connection._checkResult(tc.CMD_GET_VEHICLE_VARIABLE, tc.CMD_CHANGELANE, vehID)
return result.read("!iBiBi")[2::2] # ignore num compounds and type int
SUMO, Simulation of Urban MObility; see http://sumo.dlr.de/
Copyright (C) 2011-2017 DLR (http://www.dlr.de/) and contributors
This file is part of SUMO.
SUMO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
"""
from __future__ import absolute_import
from .domain import Domain
from .storage import Storage
from . import constants as tc
_RETURN_VALUE_FUNC = {tc.LAST_STEP_VEHICLE_NUMBER: Storage.readInt,
tc.LAST_STEP_MEAN_SPEED: Storage.readDouble,
tc.LAST_STEP_VEHICLE_ID_LIST: Storage.readStringList,
tc.LAST_STEP_VEHICLE_HALTING_NUMBER: Storage.readInt}
class MultiEntryExitDomain(Domain):
def __init__(self):
Domain.__init__(self, "multientryexit", tc.CMD_GET_MULTIENTRYEXIT_VARIABLE, None,
tc.CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE, tc.RESPONSE_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE,
tc.CMD_SUBSCRIBE_MULTIENTRYEXIT_CONTEXT, tc.RESPONSE_SUBSCRIBE_MULTIENTRYEXIT_CONTEXT,
_RETURN_VALUE_FUNC)
def getLastStepVehicleNumber(self, detID):
"""getLastStepVehicleNumber(string) -> integer
Returns the number of vehicles that have been within the named multi-entry/multi-exit detector within the last simulation step.
def getRemainingStages(self, personID):
"""getStage(string) -> int
Returns the number of remaining stages (at least 1)
"""
return self._getUniversal(tc.VAR_STAGES_REMAINING, personID)
def getLastStepVehicleIDs(self, detID):
"""getLastStepVehicleIDs(string) -> list(string)
Returns the list of ids of vehicles that were on the named detector in the last simulation step.
"""
return self._getUniversal(tc.LAST_STEP_VEHICLE_ID_LIST, detID)