Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
## DiagnosticSessionControl
if req.service == services.DiagnosticSessionControl:
if sessions.from_id(req.subfunction) == sessions.ExtendedDiagnosticSession:
response = Response(req.service, Response.Code.PositiveResponse)
else:
response = Response(req.service, Response.Code.SubFunctionNotSupported)
## SecurityAccess
elif req.service == services.SecurityAccess:
if req.subfunction == 3:
response = Response(req.service, Response.Code.PositiveResponse, data=b"\x12\x34\x56\x78")
elif req.subfunction == 4:
if req.data == b"\xed\xcb\xa9\x87":
response = Response(req.service, Response.Code.PositiveResponse)
else:
response = Response(req.service, Response.Code.InvalidKey)
else:
response = Response(req.service, Response.Code.SubFunctionNotSupported)
## Tester present
elif req.service == services.TesterPresent:
response = Response(req.service, Response.Code.PositiveResponse)
## Read Data By identifier
elif req.service == services.ReadDataByIdentifier:
if req.data == b"\x00\x01" :
response = Response(req.service, Response.Code.PositiveResponse, data=b'\x12\x34')
elif req.data == b"\x00\x02" :
response = Response(req.service, Response.Code.PositiveResponse, data=b'\x56\x78')
elif req.data == b"\x00\x03" :
response = Response(req.service, Response.Code.PositiveResponse, data=b'\x9a\xbc')
elif req.data == b"\x00\x01\x00\x02" :
def test_from_input_param(self):
with self.assertRaises(ValueError):
response = Response(service = "a string")
with self.assertRaises(ValueError):
response = Response(service = RandomClass())
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = "string")
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = -1)
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = 0x100)
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = 0x10, data=11)
response = Response(service = "a string")
with self.assertRaises(ValueError):
response = Response(service = RandomClass())
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = "string")
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = -1)
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = 0x100)
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = 0x10, data=11)
## Write Data By identifier
elif req.service == services.WriteDataByIdentifier:
if req.data[0:2] in [b"\x00\x01", b"\x00\x02", b"\x00\x03"] :
response = Response(req.service, Response.Code.PositiveResponse, req.data[0:2])
else:
response = Response(req.service, Response.Code.RequestOutOfRange)
elif req.service == services.ECUReset:
if req.subfunction in [1,2] :
response = Response(req.service, Response.Code.PositiveResponse, data=struct.pack('B', req.subfunction))
else:
response = Response(req.service, Response.Code.RequestOutOfRange)
else:
response = Response(req.service, Response.Code.ServiceNotSupported)
if not response.positive or not req.suppress_positive_response:
print("Sending: " + ''.join(['%02X' % b for b in response.get_payload()]))
conn.send(response)
else:
print ("Suppressing positive response.")
def test_make_payload_basic_negative(self):
response = Response(DummyServiceNormal(), code = 0x10) # General Reject
self.assertFalse(response.positive)
self.assertTrue(response.valid)
payload = response.get_payload()
self.assertEqual(b"\x7F\x13\x10", payload) # Original ID + 0x40. 7F indicate negative
def test_from_input_param(self):
with self.assertRaises(ValueError):
response = Response(service = "a string")
with self.assertRaises(ValueError):
response = Response(service = RandomClass())
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = "string")
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = -1)
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = 0x100)
with self.assertRaises(ValueError):
response = Response(service=DummyServiceNormal(), code = 0x10, data=11)
import time
conn = Connection('vcan0', rxid=0x456, txid=0x123)
with conn.open():
while True:
payload = conn.wait_frame(timeout=None)
if payload is not None:
print("Received: " + ''.join(['%02X' % b for b in payload]))
req = Request.from_payload(payload)
response = Response(req.service, Response.Code.GeneralReject)
## DiagnosticSessionControl
if req.service == services.DiagnosticSessionControl:
if sessions.from_id(req.subfunction) == sessions.ExtendedDiagnosticSession:
response = Response(req.service, Response.Code.PositiveResponse)
else:
response = Response(req.service, Response.Code.SubFunctionNotSupported)
## SecurityAccess
elif req.service == services.SecurityAccess:
if req.subfunction == 3:
response = Response(req.service, Response.Code.PositiveResponse, data=b"\x12\x34\x56\x78")
elif req.subfunction == 4:
if req.data == b"\xed\xcb\xa9\x87":
response = Response(req.service, Response.Code.PositiveResponse)
else:
response = Response(req.service, Response.Code.InvalidKey)
else:
response = Response(req.service, Response.Code.SubFunctionNotSupported)
## Tester present
elif req.service == services.TesterPresent:
response = Response(req.service, Response.Code.PositiveResponse)
def test_overall_timeout_pending_response(self):
if not hasattr(self, 'completed'):
self.completed = False
self.conn.touserqueue.get(timeout=0.2)
response = Response(service=services.TesterPresent, code=Response.Code.RequestCorrectlyReceived_ResponsePending)
t1 = time.time()
while time.time() - t1 < 1 and not self.completed:
self.conn.fromuserqueue.put(response.get_payload())
time.sleep(0.1)
response = Response.from_payload(payload)
self.last_response = response
self.logger.debug("Received response from server")
if not response.valid:
raise InvalidResponseException(response)
if response.service.response_id() != request.service.response_id():
msg = "Response gotten from server has a service ID different than the request service ID. Received=0x%02x, Expected=0x%02x" % (response.service.response_id() , request.service.response_id() )
raise UnexpectedResponseException(response, msg)
if not response.positive:
if not request.service.is_supported_negative_response(response.code):
self.logger.warning('Given response code "%s" (0x%02x) is not a supported negative response code according to UDS standard.' % (response.code_name, response.code))
if response.code == Response.Code.RequestCorrectlyReceived_ResponsePending:
done_receiving = False
if not using_p2_star:
# Received a 0x78 NRC: timeout is now set to P2*
single_request_timeout = self.config['p2_star_timeout']
using_p2_star = True
self.logger.debug("Server requested to wait with response code %s (0x%02x), single request timeout is now set to P2* (%.3f seconds)" % (response.code_name, response.code, single_request_timeout))
else:
raise NegativeResponseException(response)
self.logger.info('Received positive response for service %s (0x%02x) from server.' % (response.service.get_name(), response.service.request_id()))
return response