Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
if len(sys.argv) < 3:
print(usage)
sys.exit(1)
# load matrix
db = canmatrix.formats.loadp_flat(sys.argv[1])
# load frame data from argv
frame_string = sys.argv[2]
(arbitration_id_string, hexdata) = frame_string.split('#')
# set arbitration_id
if len(arbitration_id_string) <= 3:
arbitration_id = canmatrix.ArbitrationId(int(arbitration_id_string, 16), extended = False)
else:
# extended frame
arbitration_id = canmatrix.ArbitrationId(int(arbitration_id_string, 16), extended = True)
# find frame to given arbitration_id
frame = db.frame_by_id(arbitration_id)
can_data = bytearray.fromhex(hexdata)
# decode frame
decoded = frame.decode(can_data)
#print decoded signals
for (signal, value) in decoded.items():
print (signal + "\t" + hex(value.raw_value) + "\t(" + str(value.phys_value)+ ")")
temp = regexp.match(decoded)
if temp:
db.add_attribute(temp.group(1), temp.group(2))
elif decoded.startswith("SIG_GROUP_ "):
regexp = re.compile(r"^SIG_GROUP_ +(\w+) +(\w+) +(\w+) +\:(.*) *;")
temp = regexp.match(decoded)
frame = get_frame_by_id(canmatrix.ArbitrationId.from_compound_integer(int(temp.group(1))))
if frame is not None:
signal_array = temp.group(4).split(' ')
frame.add_signal_group(temp.group(2), temp.group(3), signal_array) # todo wrong annotation in canmatrix? Id is a string?
elif decoded.startswith("SIG_VALTYPE_ "):
regexp = re.compile(r"^SIG_VALTYPE_ +(\w+) +(\w+)\s*\:(.*) *;")
temp = regexp.match(decoded)
frame = get_frame_by_id(canmatrix.ArbitrationId.from_compound_integer(int(temp.group(1))))
if frame:
signal = frame.signal_by_name(temp.group(2))
signal.is_float = True
# SIG_VALTYPE_ 0 float : 1;
elif decoded.startswith("BA_DEF_DEF_ "):
pattern = r"^BA_DEF_DEF_ +\"(.+?)\" +(.+?) *;"
regexp = re.compile(pattern)
regexp_raw = re.compile(pattern.encode(dbc_import_encoding))
temp = regexp.match(decoded)
temp_raw = regexp_raw.match(l)
if temp:
db.add_define_default(temp.group(1),
temp_raw.group(2).decode(dbc_import_encoding))
elif decoded.startswith("SG_MUL_VAL_ "):
pattern = r"^SG_MUL_VAL_ +([0-9]+) +([\w\-]+) +([\w\-]+) +(.*) *;"
def dump(mydb, f, **options):
# type: (canmatrix.CanMatrix, typing.IO, **typing.Any) -> None
# create copy because export changes database
db = copy.deepcopy(mydb)
dbf_export_encoding = options.get("dbfExportEncoding", 'iso-8859-1')
ignore_encoding_errors = options.get("ignoreExportEncodingErrors", "")
db.enum_attribs_to_keys()
if len(db.signals) > 0:
free_signals_dummy_frame = canmatrix.Frame("VECTOR__INDEPENDENT_SIG_MSG")
free_signals_dummy_frame.arbitration_id = canmatrix.ArbitrationId(id=0x40000000, extended=True)
free_signals_dummy_frame.signals = db.signals
db.add_frame(free_signals_dummy_frame)
out_str = """//******************************BUSMASTER Messages and signals Database ******************************//
[DATABASE_VERSION] 1.3
[PROTOCOL] CAN
[BUSMASTER_VERSION] [1.7.2]
[NUMBER_OF_MESSAGES] """
out_str += str(len(db.frames)) + "\n"
# Frames
for frame in db.frames:
if pdu is None:
logger.error("pdu is None")
else:
logger.debug(get_element_name(pdu, ns))
if pdu is not None and "MULTIPLEXED-I-PDU" in pdu.tag:
get_frame_from_multiplexed_ipdu(pdu, new_frame, multiplex_translation, root_or_cache, ns, float_factory)
if new_frame.comment is None:
new_frame.add_comment(get_element_desc(pdu, root_or_cache, ns))
if address_mode is not None and address_mode.text == 'EXTENDED':
new_frame.arbitration_id = canmatrix.ArbitrationId(arbitration_id, extended=True)
else:
new_frame.arbitration_id = canmatrix.ArbitrationId(arbitration_id, extended=False)
if (frame_rx_behaviour_elem is not None and frame_rx_behaviour_elem.text == 'CAN-FD') or \
(frame_tx_behaviour_elem is not None and frame_tx_behaviour_elem.text == 'CAN-FD') or \
(is_fd_elem is not None and is_fd_elem.text == 'TRUE'):
new_frame.is_fd = True
else:
new_frame.is_fd = False
timing_spec = get_child(pdu, "I-PDU-TIMING-SPECIFICATION", root_or_cache, ns)
if timing_spec is None:
timing_spec = get_child(pdu, "I-PDU-TIMING-SPECIFICATIONS", root_or_cache, ns)
cyclic_timing = get_child(timing_spec, "CYCLIC-TIMING", root_or_cache, ns)
repeating_time = get_child(cyclic_timing, "REPEATING-TIME", root_or_cache, ns)
event_timing = get_child(timing_spec, "EVENT-CONTROLLED-TIMING", root_or_cache, ns)
repeats = get_child(event_timing, "NUMBER-OF-REPEATS", root_or_cache, ns)
messages = bus.findall('./' + namespace + 'Message')
for message in messages:
dlc = None
# new_frame = Frame(int(message.get('id'), 16), message.get('name'), 1, None)
new_frame = canmatrix.Frame(message.get('name'))
if 'triggered' in message.attrib:
new_frame.add_attribute("GenMsgCycleTime", message.get('interval'))
if 'length' in message.attrib:
dlc = int(message.get('length'))
new_frame.size = dlc
if 'format' in message.attrib and message.get('format') == "extended":
new_frame.arbitration_id = canmatrix.ArbitrationId(int(message.get('id'), 16), extended=True)
else:
new_frame.arbitration_id = canmatrix.ArbitrationId(int(message.get('id'), 16), extended=False)
multiplex = message.find('./' + namespace + 'Multiplex')
if multiplex is not None:
start_bit = 0
if 'offset' in multiplex.attrib:
start_bit = int(multiplex.get('offset'))
signal_size = 1
if 'length' in multiplex.attrib:
signal_size = int(multiplex.get('length'))
is_little_endian = True
min_value = None