Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self):
dtc = Dtc(0x1234)
self.assertEqual(dtc.id, 0x1234 )
self.assertEqual(dtc.status.get_byte(), b'\x00')
self.assertEqual(dtc.status.get_byte_as_int(), 0x00)
self.assertEqual(dtc.severity.get_byte(), b'\x00')
self.assertEqual(dtc.severity.get_byte_as_int(), 0x00)
self.assertEqual(dtc.status.test_failed, False)
self.assertEqual(dtc.status.test_failed_this_operation_cycle, False)
self.assertEqual(dtc.status.pending, False)
self.assertEqual(dtc.status.confirmed, False)
self.assertEqual(dtc.status.test_not_completed_since_last_clear, False)
self.assertEqual(dtc.status.test_failed_since_last_clear, False)
self.assertEqual(dtc.status.test_not_completed_this_operation_cycle, False)
self.assertEqual(dtc.status.warning_indicator_requested, False)
with self.assertRaises(TypeError):
def _test_normal_behaviour_param_instance(self):
self.udsclient.get_dtc_severity(Dtc(0x123456))
def _test_single_data_instance_param(self):
response = getattr(self.udsclient, self.client_function).__call__(dtc=Dtc(0x123456), record_number=0x99, data_size=5)
self.assert_single_data_response(response)
def _test_normal_behaviour_param_int(self):
response = self.udsclient.get_number_of_dtc_by_status_severity_mask(status_mask = 0x01, severity_mask=0xC0)
self.assertEqual(response.service_data.dtc_format, Dtc.Format.ISO14229_1)
self.assertEqual(response.service_data.dtc_count, 0x1234)
def _test_normal_behaviour_param_instance(self):
response = self.udsclient.get_number_of_dtc_by_status_severity_mask(status_mask = Dtc.Status(test_failed=True), severity_mask=Dtc.Severity(check_immediately=True, check_at_next_exit=True))
self.assertEqual(response.service_data.dtc_format, Dtc.Format.ISO14229_1)
self.assertEqual(response.service_data.dtc_count, 0x1234)
def test_set_severity_with_byte_no_error(self):
dtc=Dtc(1)
for i in range(255):
dtc.severity.set_byte(i)
elif len(response.data) < actual_byte+dtc_size:
partial_dtc_length = len(response.data)-actual_byte
if tolerate_zero_padding and response.data[actual_byte:] == b'\x00'*partial_dtc_length:
break
else:
# We purposely ignore extra byte for subfunction reportSeverityInformationOfDTC as it is supposed to return 0 or 1 DTC.
if subfunction != ReadDTCInformation.Subfunction.reportSeverityInformationOfDTC or actual_byte == 2:
raise InvalidResponseException(response, 'Incomplete DTC record. Missing %d bytes to response to complete the record' % (dtc_size-partial_dtc_length))
else:
dtc_bytes = response.data[actual_byte:actual_byte+dtc_size]
if dtc_bytes == b'\x00'*dtc_size and ignore_all_zero_dtc:
pass # ignore
else:
if subfunction in response_subfn_dtc_availability_mask_plus_dtc_record:
dtc = Dtc(struct.unpack('>L', b'\x00' + dtc_bytes[0:3])[0])
dtc.status.set_byte(dtc_bytes[3])
elif subfunction in response_subfn_dtc_availability_mask_plus_dtc_record_with_severity:
dtc = Dtc(struct.unpack('>L', b'\x00' + dtc_bytes[2:5])[0])
dtc.severity.set_byte(dtc_bytes[0])
dtc.functional_unit = dtc_bytes[1]
dtc.status.set_byte(dtc_bytes[5])
response.service_data.dtcs.append(dtc)
actual_byte += dtc_size
response.service_data.dtc_count = len(response.service_data.dtcs)
# The 2 following subfunction responses have different purposes but their constructions are very similar.
elif subfunction in response_subfn_dtc_plus_fault_counter + response_subfn_dtc_plus_sapshot_record:
dtc_size = 4
if len(response.data) < 1:
raise InvalidResponseException(response, 'Response must be at least 1 byte long (echo of subfunction)')
if len(response.data) < 5:
raise InvalidResponseException(response, 'Response must be exactly 5 bytes long ')
response.service_data.status_availability = Dtc.Status.from_byte(response.data[1])
response.service_data.dtc_format = response.data[2]
response.service_data.dtc_count = struct.unpack('>H', response.data[3:5])[0]
# This group of responses returns DTC snapshots
# Responses include a DTC, many snapshot records. For each record, we find many Data Identifiers.
# We create one Dtc.Snapshot for each DID. That'll be easier to work with.
#
elif subfunction in response_sbfn_dtc_status_snapshots_records:
if len(response.data) < 5:
raise InvalidResponseException(response, 'Response must be at least 5 bytes long ')
dtc = Dtc(struct.unpack('>L', b'\x00' + response.data[1:4])[0])
dtc.status.set_byte(response.data[4])
actual_byte = 5 # Increasing index
ServiceHelper.validate_int(dtc_snapshot_did_size, min=1, max=8, name='dtc_snapshot_did_size')
while True: # Loop until we have read all dtcs
if len(response.data) <= actual_byte:
break # done
remaining_data = response.data[actual_byte:]
if tolerate_zero_padding and remaining_data == b'\x00' * len(remaining_data):
break
if len(remaining_data) < 2:
raise InvalidResponseException(response, 'Incomplete response from server. Missing "number of identifier" and following data')
if tolerate_zero_padding and response.data[actual_byte:] == b'\x00'*partial_dtc_length:
break
else:
raise InvalidResponseException(response, 'Incomplete DTC record. Missing %d bytes to response to complete the record' % (dtc_size-partial_dtc_length))
else:
dtc_bytes = response.data[actual_byte:actual_byte+dtc_size]
if dtc_bytes == b'\x00'*dtc_size and ignore_all_zero_dtc:
pass # ignore
else:
dtcid = struct.unpack('>L', b'\x00' + dtc_bytes[0:3])[0]
# We create the DTC or get its reference if already created.
dtc_created = False
if dtcid in dtc_map and subfunction in response_subfn_dtc_plus_sapshot_record:
dtc = dtc_map[dtcid]
else:
dtc = Dtc(dtcid)
dtc_map[dtcid] = dtc
dtc_created = True
# We either read the DTC fault counter or Snapshot record number.
if subfunction in response_subfn_dtc_plus_fault_counter:
dtc.fault_counter = dtc_bytes[3]
elif subfunction in response_subfn_dtc_plus_sapshot_record:
record_number = dtc_bytes[3]
if dtc.snapshots is None:
dtc.snapshots = []
dtc.snapshots.append(record_number)
# Adds the DTC to the list.
def read_dtc_information(self, subfunction, status_mask=None, severity_mask=None, dtc=None, snapshot_record_number=None, extended_data_record_number=None, extended_data_size=None):
if dtc is not None and isinstance(dtc, Dtc):
dtc = dtc.id
request_params = {
'subfunction' : subfunction,
'status_mask' : status_mask,
'severity_mask' : severity_mask,
'dtc' : dtc,
'snapshot_record_number' : snapshot_record_number,
'extended_data_record_number' : extended_data_record_number
}
request = services.ReadDTCInformation.make_request(**request_params)
self.logger.info('%s - Sending request with subfunction "%s" (0x%02X).' % (self.service_log_prefix(services.ReadDTCInformation), services.ReadDTCInformation.Subfunction.get_name(subfunction), subfunction))
self.logger.debug('\tParams are : %s' % str(request_params))
response = self.send_request(request)