Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for mux in message.iterfind('ns:Multiplex', NAMESPACES):
signals += _load_multiplex_element(mux, nodes)
for signal in message.iterfind('ns:Signal', NAMESPACES):
signals.append(_load_signal_element(signal, nodes))
if length == 'auto':
if signals:
last_signal = sorted(signals, key=start_bit)[-1]
length = (start_bit(last_signal) + last_signal.length + 7) // 8
else:
length = 0
else:
length = int(length)
return Message(frame_id=frame_id,
is_extended_frame=is_extended_frame,
name=name,
length=length,
senders=senders,
send_type=None,
cycle_time=interval,
signals=signals,
comment=notes,
bus_name=bus_name,
strict=strict)
comment = None
if 'Len' in message_tokens[3]:
length = int(message_tokens[3]['Len'][0][2])
# Cycle time.
try:
cycle_time = num(message_tokens[3]['CycleTime'][0][2])
except (KeyError, IndexError):
pass
# Comment.
if message_tokens[3]['ID'][0][-1]:
comment = _load_comment(message_tokens[3]['ID'][0][-1][0])
return Message(frame_id=frame_id,
is_extended_frame=is_extended_frame,
name=name,
length=length,
senders=[],
send_type=None,
cycle_time=cycle_time,
signals=_load_message_signals(message_tokens,
message_section_tokens,
signals,
enums),
comment=comment,
bus_name=None,
strict=strict)
else:
multiplexer_signal = None
break
signals = _load_signals(message[6],
comments,
attributes,
definitions,
choices,
signal_types,
signal_multiplexer_values,
frame_id_dbc,
multiplexer_signal)
messages.append(
Message(frame_id=frame_id,
is_extended_frame=is_extended_frame,
name=get_message_name(frame_id_dbc, message[2]),
length=int(message[4], 0),
senders=senders,
send_type=get_send_type(frame_id_dbc),
cycle_time=get_cycle_time(frame_id_dbc),
dbc_specifics=DbcSpecifics(
attributes=get_attributes(frame_id_dbc),
attribute_definitions=definitions),
signals=signals,
comment=get_comment(frame_id_dbc),
strict=strict,
protocol=get_protocol(frame_id_dbc),
bus_name=bus_name))
return messages
time_period = i_signal_i_pdu.find(TIME_PERIOD_XPATH, NAMESPACES)
if time_period is not None:
cycle_time = int(float(time_period.text) * 1000)
i_signal_to_i_pdu_mappings = i_signal_i_pdu.iterfind(
I_SIGNAL_TO_I_PDU_MAPPING_XPATH,
NAMESPACES)
for i_signal_to_i_pdu_mapping in i_signal_to_i_pdu_mappings:
signal = self.load_signal(i_signal_to_i_pdu_mapping)
if signal is not None:
signals.append(signal)
return Message(frame_id=frame_id,
is_extended_frame=is_extended_frame,
name=name,
length=length,
senders=senders,
send_type=None,
cycle_time=cycle_time,
signals=signals,
comment=comment,
bus_name=None,
strict=self.strict)
NAMESPACES)
for value in values:
definition_ref = value.find(DEFINITION_REF_XPATH,
NAMESPACES).text
if not definition_ref.endswith('ComIPduSignalRef'):
continue
value_ref = value.find(VALUE_REF_XPATH, NAMESPACES)
signal = self.load_signal(value_ref.text)
if signal is not None:
signals.append(signal)
return Message(frame_id=frame_id,
is_extended_frame=is_extended_frame,
name=name,
length=length,
senders=senders,
send_type=None,
cycle_time=interval,
signals=signals,
comment=comment,
bus_name=None,
strict=self.strict)