Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _parse(self, varID, data):
if varID not in self._valueFunc:
raise FatalTraCIError("Unknown variable %02x." % varID)
return self._valueFunc[varID](data)
"""
Establish a connection to a TraCI-Server and return the
connection object. The connection is not saved in the pool and not
accessible via traci.switch. It should be safe to use different
connections established by this method in different threads.
"""
for wait in range(1, numRetries + 2):
try:
return Connection(host, port, proc)
except socket.error as e:
print("Could not connect to TraCI server at %s:%s" %
(host, port), e)
if wait < numRetries + 1:
print(" Retrying in %s seconds" % wait)
time.sleep(wait)
raise FatalTraCIError(str(e))
isVariableSubscription = response >= tc.RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE and response <= tc.RESPONSE_SUBSCRIBE_PERSON_VARIABLE
objectID = result.readString()
if not isVariableSubscription:
domain = result.read("!B")[0]
numVars = result.read("!B")[0]
if isVariableSubscription:
while numVars > 0:
varID = result.read("!B")[0]
status, varType = result.read("!BB")
if status:
print("Error!", result.readString())
elif response in self._subscriptionMapping:
self._subscriptionMapping[response].add(
objectID, varID, result)
else:
raise FatalTraCIError(
"Cannot handle subscription response %02x for %s." % (response, objectID))
numVars -= 1
else:
objectNo = result.read("!i")[0]
for o in range(objectNo):
oid = result.readString()
if numVars == 0:
self._subscriptionMapping[response].addContext(
objectID, self._subscriptionMapping[domain], oid)
for v in range(numVars):
varID = result.read("!B")[0]
status, varType = result.read("!BB")
if status:
print("Error!", result.readString())
elif response in self._subscriptionMapping:
self._subscriptionMapping[response].addContext(
def _sendExact(self):
if _embedded:
result = Storage(traciemb.execute(self._string))
else:
length = struct.pack("!i", len(self._string) + 4)
self._socket.send(length + self._string)
result = self._recvExact()
if not result:
self._socket.close()
del self._socket
raise FatalTraCIError("connection closed by SUMO")
for command in self._queue:
prefix = result.read("!BBB")
err = result.readString()
if prefix[2] or err:
self._string = bytes()
self._queue = []
raise TraCIException(prefix[1], _RESULTS[prefix[2]], err)
elif prefix[1] != command:
raise FatalTraCIError("Received answer %s for command %s." % (prefix[1],
command))
elif prefix[1] == tc.CMD_STOP:
length = result.read("!B")[0] - 1
result.read("!%sx" % length)
self._string = bytes()
self._queue = []
return result
length = struct.pack("!i", len(self._string) + 4)
self._socket.send(length + self._string)
result = self._recvExact()
if not result:
self._socket.close()
del self._socket
raise FatalTraCIError("connection closed by SUMO")
for command in self._queue:
prefix = result.read("!BBB")
err = result.readString()
if prefix[2] or err:
self._string = bytes()
self._queue = []
raise TraCIException(prefix[1], _RESULTS[prefix[2]], err)
elif prefix[1] != command:
raise FatalTraCIError("Received answer %s for command %s." % (prefix[1],
command))
elif prefix[1] == tc.CMD_STOP:
length = result.read("!B")[0] - 1
result.read("!%sx" % length)
self._string = bytes()
self._queue = []
return result
objectNo = result.read("!i")[0]
for o in range(objectNo):
oid = result.readString()
if numVars == 0:
self._subscriptionMapping[response].addContext(
objectID, self._subscriptionMapping[domain], oid)
for v in range(numVars):
varID = result.read("!B")[0]
status, varType = result.read("!BB")
if status:
print("Error!", result.readString())
elif response in self._subscriptionMapping:
self._subscriptionMapping[response].addContext(
objectID, self._subscriptionMapping[domain], oid, varID, result)
else:
raise FatalTraCIError(
"Cannot handle subscription response %02x for %s." % (response, objectID))
return objectID, response
def getVersion(self):
command = tc.CMD_GETVERSION
self._queue.append(command)
self._string += struct.pack("!BB", 1 + 1, command)
result = self._sendExact()
result.readLength()
response = result.read("!B")[0]
if response != command:
raise FatalTraCIError(
"Received answer %s for command %s." % (response, command))
return result.readInt(), result.readString()