Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
1 + 4 + # stageType
1 + 4 + \
sum(map(len, edges)) + 4 * len(edges) +
1 + 8 + # arrivalPos
1 + 4 + # duration
1 + 8 + # speed
1 + 4 + len(stopID)
)
self._connection._string += struct.pack("!Bi", tc.TYPE_COMPOUND, 6)
self._connection._string += struct.pack(
"!Bi", tc.TYPE_INTEGER, tc.STAGE_WALKING)
self._connection._packStringList(edges)
self._connection._string += struct.pack("!Bd",
tc.TYPE_DOUBLE, arrivalPos)
self._connection._string += struct.pack("!Bi",
tc.TYPE_INTEGER, duration)
self._connection._string += struct.pack("!Bd", tc.TYPE_DOUBLE, speed)
self._connection._packString(stopID)
self._connection._sendExact()
When setting begin time and end time (in seconds), the changes only
apply to that time range
"""
if begin is None and end is None:
self._connection._beginMessage(
tc.CMD_SET_EDGE_VARIABLE, tc.VAR_EDGE_EFFORT, edgeID, 1 + 4 + 1 + 8)
self._connection._string += struct.pack("!BiBd",
tc.TYPE_COMPOUND, 1, tc.TYPE_DOUBLE, effort)
self._connection._sendExact()
elif begin is not None and end is not None:
self._connection._beginMessage(
tc.CMD_SET_EDGE_VARIABLE, tc.VAR_EDGE_EFFORT, edgeID, 1 + 4 + 1 + 4 + 1 + 4 + 1 + 8)
self._connection._string += struct.pack("!BiBiBiBd",
tc.TYPE_COMPOUND, 3,
tc.TYPE_INTEGER, begin,
tc.TYPE_INTEGER, end,
tc.TYPE_DOUBLE, effort)
self._connection._sendExact()
else:
raise TraCIException(
"Both, begin time and end time must be specified")
def getLaneChangeState(self, vehID, direction):
"""getLaneChangeState(string, int) -> (int, int)
Return the lane change state for the vehicle
"""
self._connection._beginMessage(
tc.CMD_GET_VEHICLE_VARIABLE, tc.CMD_CHANGELANE, vehID, 1 + 4)
self._connection._string += struct.pack("!Bi", tc.TYPE_INTEGER, direction)
result = self._connection._checkResult(tc.CMD_GET_VEHICLE_VARIABLE, tc.CMD_CHANGELANE, vehID)
return result.read("!iBiBi")[2::2] # ignore num compounds and type int
def _sendIntCmd(self, cmdID, varID, objID, value):
self._beginMessage(cmdID, varID, objID, 1 + 4)
self._string += struct.pack("!Bi", tc.TYPE_INTEGER, value)
self._sendExact()
def setStop(self, vehID, edgeID, pos=1., laneIndex=0, duration=2**31 - 1,
flags=tc.STOP_DEFAULT, startPos=tc.INVALID_DOUBLE_VALUE, until=-1):
"""setStop(string, string, double, integer, integer, integer, double, integer) -> None
Adds or modifies a stop with the given parameters. The duration and the until attribute are
in milliseconds.
"""
self._connection._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE, tc.CMD_STOP,
vehID, 1 + 4 + 1 + 4 + len(edgeID) + 1 + 8 + 1 + 1 + 1 + 4 + 1 + 1 + 1 + 8 + 1 + 4)
self._connection._string += struct.pack("!Bi", tc.TYPE_COMPOUND, 7)
self._connection._packString(edgeID)
self._connection._string += struct.pack("!BdBBBiBB", tc.TYPE_DOUBLE, pos,
tc.TYPE_BYTE, laneIndex, tc.TYPE_INTEGER, duration, tc.TYPE_BYTE, flags)
self._connection._string += struct.pack("!BdBi",
tc.TYPE_DOUBLE, startPos, tc.TYPE_INTEGER, until)
self._connection._sendExact()
def getEffort(self, vehID, time, edgeID):
"""getEffort(string, double, string) -> double
.
"""
self._connection._beginMessage(tc.CMD_GET_VEHICLE_VARIABLE,
tc.VAR_EDGE_EFFORT, vehID, 1 + 4 + 1 + 4 + 1 + 4 + len(edgeID))
self._connection._string += struct.pack(
"!BiBi", tc.TYPE_COMPOUND, 2, tc.TYPE_INTEGER, time)
self._connection._packString(edgeID)
return self._connection._checkResult(tc.CMD_GET_VEHICLE_VARIABLE, tc.VAR_EDGE_EFFORT, vehID).readDouble()
def setStop(self, vehID, edgeID, pos=1., laneIndex=0, duration=2**31 - 1,
flags=tc.STOP_DEFAULT, startPos=tc.INVALID_DOUBLE_VALUE, until=-1):
"""setStop(string, string, double, integer, integer, integer, double, integer) -> None
Adds or modifies a stop with the given parameters. The duration and the until attribute are
in milliseconds.
"""
self._connection._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE, tc.CMD_STOP,
vehID, 1 + 4 + 1 + 4 + len(edgeID) + 1 + 8 + 1 + 1 + 1 + 4 + 1 + 1 + 1 + 8 + 1 + 4)
self._connection._string += struct.pack("!Bi", tc.TYPE_COMPOUND, 7)
self._connection._packString(edgeID)
self._connection._string += struct.pack("!BdBBBiBB", tc.TYPE_DOUBLE, pos,
tc.TYPE_BYTE, laneIndex, tc.TYPE_INTEGER, duration, tc.TYPE_BYTE, flags)
self._connection._string += struct.pack("!BdBi",
tc.TYPE_DOUBLE, startPos, tc.TYPE_INTEGER, until)
self._connection._sendExact()
def getAdaptedTraveltime(self, edgeID, time):
"""getAdaptedTraveltime(string, double) -> double
Returns the travel time value (in s) used for (re-)routing
which is valid on the edge at the given time.
"""
self._connection._beginMessage(tc.CMD_GET_EDGE_VARIABLE, tc.VAR_EDGE_TRAVELTIME,
edgeID, 1 + 4)
self._connection._string += struct.pack(
"!Bi", tc.TYPE_INTEGER, time)
return self._connection._checkResult(tc.CMD_GET_EDGE_VARIABLE,
tc.VAR_EDGE_TRAVELTIME, edgeID).readDouble()
def setAdaptedTraveltime(self, vehID, begTime, endTime, edgeID, time):
"""setAdaptedTraveltime(string, double, string, double) -> None
Inserts the information about the travel time of edge "edgeID" valid
from begin time to end time into the vehicle's internal edge weights
container. .
"""
self._connection._beginMessage(tc.CMD_SET_VEHICLE_VARIABLE, tc.VAR_EDGE_TRAVELTIME,
vehID, 1 + 4 + 1 + 4 + 1 + 4 + 1 + 4 + len(edgeID) + 1 + 8)
self._connection._string += struct.pack("!BiBiBi", tc.TYPE_COMPOUND, 4, tc.TYPE_INTEGER, begTime,
tc.TYPE_INTEGER, endTime)
self._connection._packString(edgeID)
self._connection._string += struct.pack("!Bd", tc.TYPE_DOUBLE, time)
self._connection._sendExact()
self._connection._string += struct.pack("!Bi",
tc.TYPE_COMPOUND, itemNo)
# programID
self._connection._packString(tls._subID)
# type
self._connection._string += struct.pack("!Bi", tc.TYPE_INTEGER, 0)
# subitems
self._connection._string += struct.pack("!Bi", tc.TYPE_COMPOUND, 0)
# index
self._connection._string += struct.pack("!Bi",
tc.TYPE_INTEGER, tls._currentPhaseIndex)
# phaseNo
self._connection._string += struct.pack("!Bi",
tc.TYPE_INTEGER, len(tls._phases))
for p in tls._phases:
self._connection._string += struct.pack("!BiBiBi", tc.TYPE_INTEGER,
p._duration, tc.TYPE_INTEGER, p._duration1, tc.TYPE_INTEGER, p._duration2)
self._connection._packString(p._phaseDef)
self._connection._sendExact()