Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
new_frame = db.add_frame(
canmatrix.Frame(
name,
size=int(size),
transmitters=transmitters))
new_frame.arbitration_id = canmatrix.ArbitrationId.from_compound_integer(int(arb_id))
# Frame(int(Id), name, size, transmitter))
if extended == 'X':
logger.debug("Extended")
new_frame.arbitration_id.extended = True
if line.startswith("[NODE]"):
temp_str = line.strip()[6:].strip()
bo_list = temp_str.split(',')
for bo in bo_list:
db.add_ecu(canmatrix.Ecu(bo))
if line.startswith("[START_SIGNALS]"):
temp_str = line.strip()[15:].strip()
temp_array = temp_str.split(',')
(name, size, start_byte, start_bit, sign, max_val, min_val, byteorder,
offset, factor, unit, multiplex) = temp_array[0:12]
min_val = float_factory(min_val)
max_val = float_factory(max_val)
factor = float_factory(factor)
offset = float_factory(offset)
if len(temp_array) > 12:
receiver = temp_array[12].split(',')
else:
receiver = []
frame.add_transmitter(get_element_name(ecu_elem, ns))
else:
pass
# for inf in inFrame:
# if inf in multiplexTranslation:
# inf = multiplexTranslation[inf]
# frame = db.frameByName(inf)
# if frame is not None:
# for signal in frame.signals:
# recname = arGetName(ecu, ns)
# if recname not in signal.receiver:
# signal.receiver.append(recname)
# else:
# print "in not found: " + inf
new_ecu = canmatrix.Ecu(get_element_name(ecu_elem, ns))
if nm_address is not None:
new_ecu.add_attribute("NWM-Stationsadresse", nm_address.text)
new_ecu.add_attribute("NWM-Knoten", "ja")
else:
new_ecu.add_attribute("NWM-Stationsadresse", "0")
new_ecu.add_attribute("NWM-Knoten", "nein")
return new_ecu
float_factory = options.get("float_factory", default_float_factory) # type: typing.Callable
dbs = {} # type: typing.Dict[str, canmatrix.CanMatrix]
tree = lxml.etree.parse(f)
root = tree.getroot()
namespace = "{" + tree.xpath('namespace-uri(.)') + "}"
node_list = {}
nodes = root.findall('./' + namespace + 'Node')
buses = root.findall('./' + namespace + 'Bus')
counter = 0
for bus in buses:
db = canmatrix.CanMatrix()
db.add_frame_defines("GenMsgCycleTime", 'INT 0 65535')
for node in nodes:
db.ecus.append(canmatrix.Ecu(node.get('name')))
node_list[node.get('id')] = node.get('name')
messages = bus.findall('./' + namespace + 'Message')
for message in messages:
dlc = None
# new_frame = Frame(int(message.get('id'), 16), message.get('name'), 1, None)
new_frame = canmatrix.Frame(message.get('name'))
if 'triggered' in message.attrib:
new_frame.add_attribute("GenMsgCycleTime", message.get('interval'))
if 'length' in message.attrib:
dlc = int(message.get('length'))
new_frame.size = dlc
for key in sheet[0]:
if sheet[0][key].strip() == 'Byteorder':
ecu_start = letter_index.index(key) + 1
break
else:
for key in sheet[0]:
if sheet[0][key].strip() == 'Signal Not Available':
ecu_start = letter_index.index(key) + 1
for key in sheet[0]:
if sheet[0][key].strip() == 'Value':
ecu_end = letter_index.index(key)
# ECUs:
for x in range(ecu_start, ecu_end):
db.add_ecu(canmatrix.Ecu(sheet[0][letter_index[x]]))
# initialize:
frame_id = None
signal_name = ""
signal_length = 8
new_frame = None # type: typing.Optional[canmatrix.Frame]
new_signal = None # type: typing.Optional[canmatrix.Signal]
for row in sheet[1]:
# ignore empty row
if 'ID' not in row:
continue
# new frame detected
if row['ID'] != frame_id:
# new Frame
frame_id = row['ID']
def copy_ecu_with_frames(ecu_or_glob, source_db, target_db, rx=True, tx=True):
# type: (typing.Union[canmatrix.Ecu, str], canmatrix.CanMatrix, canmatrix.CanMatrix) -> None
"""
Copy ECU(s) identified by Name or as Object from source CAN matrix to target CAN matrix.
This function additionally copy all relevant Frames and Defines.
:param ecu_or_glob: Ecu instance or glob pattern for Ecu name
:param source_db: Source CAN matrix
:param target_db: Destination CAN matrix
"""
# check whether ecu_or_glob is object or symbolic name
if isinstance(ecu_or_glob, canmatrix.Ecu):
ecu_list = [ecu_or_glob]
else:
ecu_list = source_db.glob_ecus(ecu_or_glob)
for ecu in ecu_list:
logger.info("Copying ECU " + ecu.name)
target_db.add_ecu(copy.deepcopy(ecu))
# copy tx-frames
if tx is True:
for frame in source_db.frames:
if ecu.name in frame.transmitters:
copy_frame(frame.arbitration_id, source_db, target_db)
# copy rx-frames
index['function'] = i
elif "Byteorder" in value:
index['byteorder'] = i
else:
if 'Value' in index and i > index['Value']:
additional_inputs[i] = value
if "byteorder" in index:
index['ECUstart'] = index['byteorder'] + 1
else:
index['ECUstart'] = index['signalSNA'] + 1
index['ECUend'] = index['Value']
# ECUs:
for x in range(index['ECUstart'], index['ECUend']):
db.add_ecu(canmatrix.Ecu(sh.cell(0, x).value))
# initialize:
frame_id = None
signal_name = ""
new_frame = None
for row_num in range(1, sh.nrows):
# ignore empty row
if len(sh.cell(row_num, index['ID']).value) == 0:
break
# new frame detected
if sh.cell(row_num, index['ID']).value != frame_id:
# new Frame
frame_id = sh.cell(row_num, index['ID']).value
frame_name = sh.cell(row_num, index['frameName']).value
cycle_time = sh.cell(row_num, index['cycle']).value