Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import codecs
import sys
sys.path.append('..')
import canmatrix
#
# create target Matrix
#
db = canmatrix.CanMatrix()
db.boardUnits.add(canmatrix.BoardUnit("testBU"))
db.boardUnits.add(canmatrix.BoardUnit("recBU"))
myFrame1 = canmatrix.Frame("canFdStandard1",Id=1, dlc=24, is_fd = True, transmitter=["testBU"])
myFrame2 = canmatrix.Frame("CanFdExtended2",Id=2, dlc=16, extended = True, is_fd = True, transmitter=["testBU"])
myFrame3 = canmatrix.Frame("CanExtended3", Id=3, dlc=8, extended = True, transmitter=["testBU"])
myFrame4 = canmatrix.Frame("CanStandard4", Id=4, dlc=8)
mySignal1 = canmatrix.Signal("signal1", signalSize=64, startBit=0, is_little_endian=True, min=0, max=0, is_signed=True, receiver=["recBU"])
mySignal2 = canmatrix.Signal("signal2", signalSize=64, startBit=64, is_little_endian=True, min=0, max=0, is_signed=True, receiver=["recBU"])
mySignal3 = canmatrix.Signal("signal3", signalSize=64, startBit=128, is_little_endian=True, min=0, max=0, is_signed=True)
myFrame1.addSignal(mySignal3)
myFrame1.addSignal(mySignal2)
myFrame1.addSignal(mySignal1)
myFrame2.addSignal(mySignal2)
myFrame2.addSignal(mySignal1)
sys.path.append('..')
import canmatrix
#
# create target Matrix
#
db = canmatrix.CanMatrix()
db.ecus.add(canmatrix.ecu("testBU"))
db.ecus.add(canmatrix.ecu("recBU"))
myFrame1 = canmatrix.Frame("canFdStandard1",Id=1, dlc=24, is_fd = True, transmitter=["testBU"])
myFrame2 = canmatrix.Frame("CanFdExtended2",Id=2, dlc=16, extended = True, is_fd = True, transmitter=["testBU"])
myFrame3 = canmatrix.Frame("CanExtended3", Id=3, dlc=8, extended = True, transmitter=["testBU"])
myFrame4 = canmatrix.Frame("CanStandard4", Id=4, dlc=8)
mySignal1 = canmatrix.Signal("signal1", signalSize=64, startBit=0, is_little_endian=True, min=0, max=0, is_signed=True, receiver=["recBU"])
mySignal2 = canmatrix.Signal("signal2", signalSize=64, startBit=64, is_little_endian=True, min=0, max=0, is_signed=True, receiver=["recBU"])
mySignal3 = canmatrix.Signal("signal3", signalSize=64, startBit=128, is_little_endian=True, min=0, max=0, is_signed=True)
myFrame1.add_signal(mySignal3)
myFrame1.add_signal(mySignal2)
myFrame1.add_signal(mySignal1)
myFrame2.add_signal(mySignal2)
myFrame2.add_signal(mySignal1)
db.frames.add_frame(myFrame1)
db.frames.add_frame(myFrame2)
try:
comment += "\n" + \
l.decode(dbc_comment_encoding).replace('\\"', '"')
except:
logger.error("Error decoding line: %d (%s)" % (i, line))
if re.match('.*" *;\Z',l.decode(dbc_import_encoding).strip()) is not None:
follow_up = _FollowUps.NOTHING
if board_unit is not None:
board_unit.add_comment(comment[:-1].strip()[:-1])
continue
decoded = l.decode(dbc_import_encoding).strip()
if decoded.startswith("BO_ "):
regexp = re.compile(r"^BO_ ([^\ ]+) ([^\ ]+) *: ([^\ ]+) ([^\ ]+)")
temp = regexp.match(decoded)
# db.frames.addFrame(Frame(temp.group(1), temp.group(2), temp.group(3), temp.group(4)))
frame = canmatrix.Frame(temp.group(2), arbitration_id=int(temp.group(1)),
size=int(temp.group(3)), transmitters=temp.group(4).split())
db.frames.append(frame)
add_frame_by_id(frame)
elif decoded.startswith("SG_ "):
pattern = r"^SG_ +(\w+) *: *(\d+)\|(\d+)@(\d+)([\+|\-]) +\(([0-9.+\-eE]+), *([0-9.+\-eE]+)\) +\[([0-9.+\-eE]+)\|([0-9.+\-eE]+)\] +\"(.*)\" +(.*)"
regexp = re.compile(pattern)
temp = regexp.match(decoded)
regexp_raw = re.compile(pattern.encode(dbc_import_encoding))
temp_raw = regexp_raw.match(l)
if temp:
receiver = [b.strip() for b in temp.group(11).split(',')]
extras = {} # type: typing.Dict[typing.Any, typing.Any]
# if float_factory is not None:
# extras['float_factory'] = float_factory
#
frame2 = canmatrix.Frame("Node605", j1939_pgn = 0xff01, j1939_prio = 0x6,
j1939_source = 0x80,
comment="J1939 packet containing 8 byte payload")
sig = canmatrix.Signal("ch1", size=32, is_float=True, is_little_endian=False, startBit=0)
sig2 = canmatrix.Signal("ch2", size=32, is_float=True, is_little_endian=False, startBit=32)
frame2.add_signal(sig)
frame2.add_signal(sig2)
cm.add_frame(frame2)
#
# create frame Node606
#
frame3 = canmatrix.Frame("Node606", j1939_pgn = 0xff02, j1939_prio = 0x6,
j1939_source = 0x80,
comment="J1939 packet containing <8 byte payload")
sig = canmatrix.Signal("ch1", size=32, is_float=True, is_little_endian=False, startBit=0)
frame3.add_signal(sig)
cm.add_frame(frame3)
cm.recalc_dlc("force")
# save dbc
canmatrix.formats.dumpp({"":cm}, "example_j1939.dbc")
arbitration_id = int(arb_id.text)
if frame_elem is not None:
dlc_elem = get_child(frame_elem, "FRAME-LENGTH", root_or_cache, ns)
pdu_mappings = get_child(frame_elem, "PDU-TO-FRAME-MAPPINGS", root_or_cache, ns)
pdu_mapping = get_child(pdu_mappings, "PDU-TO-FRAME-MAPPING", root_or_cache, ns)
pdu = get_child(pdu_mapping, "PDU", root_or_cache, ns) # SIGNAL-I-PDU
if pdu is not None and 'SECURED-I-PDU' in pdu.tag:
payload = get_child(pdu, "PAYLOAD", root_or_cache, ns)
pdu = get_child(payload, "I-PDU", root_or_cache, ns)
# logger.info("found secured pdu - no signal extraction possible: %s", get_element_name(pdu, ns))
pdu_frame_mapping[pdu] = get_element_name(frame_elem, ns)
new_frame = canmatrix.Frame(get_element_name(frame_elem, ns), size=int(dlc_elem.text))
comment = get_element_desc(frame_elem, root_or_cache, ns)
if comment is not None:
new_frame.add_comment(comment)
else:
# without frameinfo take short-name of frametriggering and dlc = 8
logger.debug("Frame %s has no FRAME-REF", frame_name_elem.text)
ipdu_triggering_refs = get_child(frame_triggering, "I-PDU-TRIGGERING-REFS", root_or_cache, ns)
ipdu_triggering = get_child(ipdu_triggering_refs, "I-PDU-TRIGGERING", root_or_cache, ns)
pdu = get_child(ipdu_triggering, "I-PDU", root_or_cache, ns)
if pdu is None:
pdu = get_child(ipdu_triggering, "I-SIGNAL-I-PDU", root_or_cache, ns) # AR4.2
dlc_elem = get_child(pdu, "LENGTH", root_or_cache, ns)
new_frame = canmatrix.Frame(frame_name_elem.text, arbitration_id=arbitration_id, size=int(int(dlc_elem.text) / 8))
if pdu is None:
logger.error("pdu is None")
#!/usr/bin/env python3
import canmatrix.formats
cm = canmatrix.CanMatrix()
#
# create frame Node604
#
frame1 = canmatrix.Frame("Node604", j1939_pgn = 0xff00, j1939_prio = 0x6,
j1939_source = 0x80,
comment = "J1939 packet containing >8 byte payload")
for i in range(1,9):
sig = canmatrix.Signal("ch%d" % i, size = 32, is_float = True, is_little_endian = False, startBit = (i-1)*32)
frame1.add_signal(sig)
cm.add_frame(frame1)
#
# create frame Node605
#
frame2 = canmatrix.Frame("Node605", j1939_pgn = 0xff01, j1939_prio = 0x6,
j1939_source = 0x80,
comment="J1939 packet containing 8 byte payload")
sig = canmatrix.Signal("ch1", size=32, is_float=True, is_little_endian=False, startBit=0)
sig2 = canmatrix.Signal("ch2", size=32, is_float=True, is_little_endian=False, startBit=32)
buses = root.findall('./' + namespace + 'Bus')
counter = 0
for bus in buses:
db = canmatrix.CanMatrix()
db.add_frame_defines("GenMsgCycleTime", 'INT 0 65535')
for node in nodes:
db.ecus.append(canmatrix.Ecu(node.get('name')))
node_list[node.get('id')] = node.get('name')
messages = bus.findall('./' + namespace + 'Message')
for message in messages:
dlc = None
# new_frame = Frame(int(message.get('id'), 16), message.get('name'), 1, None)
new_frame = canmatrix.Frame(message.get('name'))
if 'triggered' in message.attrib:
new_frame.add_attribute("GenMsgCycleTime", message.get('interval'))
if 'length' in message.attrib:
dlc = int(message.get('length'))
new_frame.size = dlc
if 'format' in message.attrib and message.get('format') == "extended":
new_frame.arbitration_id = canmatrix.ArbitrationId(int(message.get('id'), 16), extended=True)
else:
new_frame.arbitration_id = canmatrix.ArbitrationId(int(message.get('id'), 16), extended=False)
multiplex = message.find('./' + namespace + 'Multiplex')
if multiplex is not None:
start_bit = 0
# ignore empty row
if 'ID' not in row:
continue
# new frame detected
if row['ID'] != frame_id:
# new Frame
frame_id = row['ID']
frame_name = row['Frame Name']
cycle_time = get_if_possible(row, 'Cycle Time [ms]', '0')
launch_type = get_if_possible(row, 'Launch Type')
dlc = 8
# launch_param = get_if_possible(row, 'Launch Parameter', '0')
# launch_param = str(int(launch_param))
if frame_id.endswith("xh"):
new_frame = canmatrix.Frame(frame_name, arbitration_id=int(frame_id[:-2], 16), size=dlc)
new_frame.arbitration_id.extended = True
else:
new_frame = canmatrix.Frame(frame_name, arbitration_id=int(frame_id[:-1], 16), size=dlc)
db.add_frame(new_frame)
# eval launch_type
if launch_type is not None:
new_frame.add_attribute("GenMsgSendType", launch_type)
if launch_type not in launch_types:
launch_types.append(launch_type)
new_frame.cycle_time = cycle_time
# new signal detected
if 'Signal Name' in row and row['Signal Name'] != signal_name:
#
# create frame Node604
#
frame1 = canmatrix.Frame("Node604", j1939_pgn = 0xff00, j1939_prio = 0x6,
j1939_source = 0x80,
comment = "J1939 packet containing >8 byte payload")
for i in range(1,9):
sig = canmatrix.Signal("ch%d" % i, size = 32, is_float = True, is_little_endian = False, startBit = (i-1)*32)
frame1.add_signal(sig)
cm.add_frame(frame1)
#
# create frame Node605
#
frame2 = canmatrix.Frame("Node605", j1939_pgn = 0xff01, j1939_prio = 0x6,
j1939_source = 0x80,
comment="J1939 packet containing 8 byte payload")
sig = canmatrix.Signal("ch1", size=32, is_float=True, is_little_endian=False, startBit=0)
sig2 = canmatrix.Signal("ch2", size=32, is_float=True, is_little_endian=False, startBit=32)
frame2.add_signal(sig)
frame2.add_signal(sig2)
cm.add_frame(frame2)
#
# create frame Node606
#
frame3 = canmatrix.Frame("Node606", j1939_pgn = 0xff02, j1939_prio = 0x6,
j1939_source = 0x80,
comment="J1939 packet containing <8 byte payload")