Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _test_set_params_invalidservice_no_exception(self):
self.udsclient.config['exception_on_invalid_response'] = False
control_type = services.CommunicationControl.ControlType.disableRxAndTx
com_type = CommunicationType(subnet=5, normal_msg=True, network_management_msg=True)
response = self.udsclient.communication_control(control_type=control_type, communication_type=com_type)
self.assertFalse(response.valid)
def _test_comcontrol_enable_node_spr(self):
control_type = services.CommunicationControl.ControlType.enableRxAndTx
com_type = CommunicationType(subnet=CommunicationType.Subnet.node, normal_msg=True)
with self.udsclient.suppress_positive_response:
response = self.udsclient.communication_control(control_type=control_type, communication_type=com_type)
self.assertEqual(response, None)
self.conn.fromuserqueue.get(timeout=0.2) #Avoid closing connection prematurely
def _test_comcontrol_bad_control_type_no_exception(self):
self.udsclient.config['exception_on_unexpected_response'] = False
com_type = CommunicationType(subnet=3, normal_msg=True, network_management_msg=True)
response = self.udsclient.communication_control(control_type=9, communication_type=com_type)
self.assertTrue(response.valid)
self.assertTrue(response.unexpected)
def test_str_repr(self):
comtype = CommunicationType(subnet=CommunicationType.Subnet.node, normal_msg=True, network_management_msg=False)
str(comtype)
comtype.__repr__()
def test_make(self):
comtype = CommunicationType(subnet=CommunicationType.Subnet.node, normal_msg=True, network_management_msg=False)
self.assertEqual(comtype.get_byte(), b'\x01')
comtype = CommunicationType(subnet=CommunicationType.Subnet.network, normal_msg=True, network_management_msg=False)
self.assertEqual(comtype.get_byte(), b'\xF1')
comtype = CommunicationType(subnet=3, normal_msg=True, network_management_msg=True)
self.assertEqual(comtype.get_byte(), b'\x33')
def _test_comcontrol_negative_response_exception(self):
with self.assertRaises(NegativeResponseException) as handle:
control_type = services.CommunicationControl.ControlType.disableRxAndTx
com_type = CommunicationType(subnet=3, normal_msg=True, network_management_msg=True)
self.udsclient.communication_control(control_type=control_type, communication_type=com_type)
def _test_bad_param(self):
valid_com_type = CommunicationType(subnet=3, normal_msg=True, network_management_msg=True)
with self.assertRaises(ValueError):
self.udsclient.communication_control(control_type='x', communication_type=valid_com_type)
with self.assertRaises(ValueError):
self.udsclient.communication_control(control_type=0x80, communication_type=valid_com_type)
with self.assertRaises(ValueError):
self.udsclient.communication_control(control_type=-1, communication_type=valid_com_type)
with self.assertRaises(ValueError):
self.udsclient.communication_control(control_type=0, communication_type='x')
def _test_comcontrol_enable_node(self):
control_type = services.CommunicationControl.ControlType.enableRxAndTx
com_type = CommunicationType(subnet=CommunicationType.Subnet.node, normal_msg=True)
response = self.udsclient.communication_control(control_type=control_type, communication_type=com_type)
self.assertTrue(response.positive)
self.assertEqual(response.service_data.control_type_echo, control_type)
response = self.udsclient.communication_control(control_type=control_type, communication_type=com_type.get_byte())
self.assertTrue(response.positive)
self.assertEqual(response.service_data.control_type_echo, control_type)
response = self.udsclient.communication_control(control_type=control_type, communication_type=com_type.get_byte_as_int())
self.assertTrue(response.positive)
self.assertEqual(response.service_data.control_type_echo, control_type)
def normalize_communication_type(self, communication_type):
from udsoncan import CommunicationType
if not isinstance(communication_type, CommunicationType) and not isinstance(communication_type, int) and not isinstance(communication_type, bytes):
raise ValueError('communication_type must either be a CommunicationType object or an integer')
if isinstance(communication_type, int) or isinstance(communication_type, bytes):
communication_type = CommunicationType.from_byte(communication_type)
return communication_type