Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def name_bit(signal):
offset = start_bit(signal) + signal.length - 1
if signal.byte_order == 'big_endian':
return (8 * (offset // 8) + (7 - (offset % 8)))
else:
return offset
senders.append(_get_node_name_by_id(nodes,
sender.attrib['id']))
# Find all signals in this message.
signals = []
for mux in message.iterfind('ns:Multiplex', NAMESPACES):
signals += _load_multiplex_element(mux, nodes)
for signal in message.iterfind('ns:Signal', NAMESPACES):
signals.append(_load_signal_element(signal, nodes))
if length == 'auto':
if signals:
last_signal = sorted(signals, key=start_bit)[-1]
length = (start_bit(last_signal) + last_signal.length + 7) // 8
else:
length = 0
else:
length = int(length)
return Message(frame_id=frame_id,
is_extended_frame=is_extended_frame,
name=name,
length=length,
senders=senders,
send_type=None,
cycle_time=interval,
signals=signals,
comment=notes,
bus_name=bus_name,
strict=strict)
for mux in message.iterfind('ns:Multiplex', NAMESPACES):
signals += _load_multiplex_element(mux, nodes)
for signal in message.iterfind('ns:Signal', NAMESPACES):
signals.append(_load_signal_element(signal, nodes))
if length == 'auto':
if signals:
last_signal = sorted(signals, key=start_bit)[-1]
length = (start_bit(last_signal) + last_signal.length + 7) // 8
else:
length = 0
else:
length = int(length)
return Message(frame_id=frame_id,
is_extended_frame=is_extended_frame,
name=name,
length=length,
senders=senders,
send_type=None,
cycle_time=interval,
signals=signals,
comment=notes,
bus_name=bus_name,
strict=strict)
if tokens[9]:
comment = _load_comment(tokens[9][0])
# The rest.
unit, factor, offset, enum, minimum, maximum, decimal = _load_signal_attributes(
tokens[8],
enum,
enums,
minimum,
maximum,
decimal)
if byte_order == 'big_endian':
start = (8 * (start // 8) + (7 - (start % 8)))
return Signal(name=name,
start=start,
length=length,
receivers=[],
byte_order=byte_order,
is_signed=is_signed,
scale=factor,
offset=offset,
minimum=minimum,
maximum=maximum,
unit=unit,
choices=enum,
comment=comment,
is_multiplexer=False,
multiplexer_ids=multiplexer_ids,
multiplexer_signal=multiplexer_signal,
is_float=is_float,
def test_immediate_quit(self,
use_default_colors,
curs_set,
init_pair,
is_term_resized,
color_pair,
bus,
notifier):
# Prepare mocks.
stdscr = StdScr()
args = Args('tests/files/dbc/motohawk.dbc')
color_pair.side_effect = ['green', 'cyan']
is_term_resized.return_value = False
# Run monitor.
monitor = Monitor(stdscr, args)
monitor.run()
# Check mocks.
self.assert_called(use_default_colors, [call()])
self.assert_called(curs_set, [call(False)])
self.assert_called(
init_pair,
[
call(1, curses.COLOR_BLACK, curses.COLOR_GREEN),
call(2, curses.COLOR_BLACK, curses.COLOR_CYAN)
])
self.assert_called(color_pair, [call(1), call(2)])
self.assert_called(bus, [call(bustype='socketcan', channel='vcan0')])
self.assert_called(
stdscr.addstr,
[
def test_string_attribute_definition_dump(self):
filename = os.path.join('tests', 'files', 'test_multiplex_dump.dbc')
db = cantools.db.load_file(filename)
dumped_db = cantools.db.load_string(db.as_dbc_string())
attribute = dumped_db.dbc.attribute_definitions
self.assertEqual(attribute['BusType'].type_name, "STRING")
def test_the_homer_encode_decode_signed(self):
filename = os.path.join('tests', 'files', 'the_homer.kcd')
db = cantools.db.load_file(filename)
datas = [
(
{
'TankLevel': 0,
'TankTemperature': -32768,
'FillingStatus': 0
},
b'\x00\x00\x00\x80\x00'
),
(
{
'TankLevel': 65535,
'TankTemperature': 32767,
'FillingStatus': 15
},
def test_get_bus_by_name(self):
filename = os.path.join('tests', 'files', 'the_homer.kcd')
db = cantools.db.load_file(filename)
self.assertIs(db.get_bus_by_name('Comfort'), db.buses[2])
with self.assertRaises(KeyError) as cm:
db.get_bus_by_name('Missing')
self.assertEqual(str(cm.exception), "'Missing'")
def test_get_signal_by_name(self):
filename = os.path.join('tests', 'files', 'foobar.dbc')
db = cantools.db.load_file(filename)
message = db.get_message_by_name('Foo')
signal = message.get_signal_by_name('Foo')
self.assertEqual(signal.name, 'Foo')
signal = message.get_signal_by_name('Bar')
self.assertEqual(signal.name, 'Bar')
with self.assertRaises(KeyError) as cm:
message.get_signal_by_name('Fum')
self.assertEqual(str(cm.exception), "'Fum'")
def test_multiplex_choices(self):
filename = os.path.join('tests', 'files', 'multiplex_choices.dbc')
db = cantools.db.load_file(filename)
# With Multiplexor and BIT_L as strings.
decoded_message = {
'Multiplexor': 'MULTIPLEXOR_8',
'BIT_C': 1, 'BIT_G': 1, 'BIT_J': 1, 'BIT_L': 'On'
}
encoded_message = b'\x20\x00\x8c\x01\x00\x00\x00\x00'
message_1 = db.messages[0]
encoded = message_1.encode(decoded_message)
self.assertEqual(encoded, encoded_message)
decoded = message_1.decode(encoded)
self.assertEqual(decoded, decoded_message)
# With Multiplexor and BIT_L as numbers.