Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_simple_protocol(self):
"""
Test a simple protocol with
preamble, sync and length field (8 bit) and some random data
:return:
"""
mb = MessageTypeBuilder("simple_length_test")
mb.add_label(FieldType.Function.PREAMBLE, 8)
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
num_messages_by_data_length = {8: 5, 16: 10, 32: 15}
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x9a9d"})
for data_length, num_messages in num_messages_by_data_length.items():
for i in range(num_messages):
pg.generate_message(data=pg.decimal_to_bits(5 * i, data_length))
self.save_protocol("simple_length", pg)
self.clear_message_types(pg.protocol.messages)
ff = FormatFinder(pg.protocol.messages)
length_engine = LengthEngine(ff.bitvectors)
def test_medium_protocol(self):
"""
Test a protocol with preamble, sync, length field, 2 participants and addresses and seq nr and random data
:return:
"""
mb = MessageTypeBuilder("medium_test")
mb.add_label(FieldType.Function.PREAMBLE, 8)
mb.add_label(FieldType.Function.SYNC, 8)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 16)
mb.add_label(FieldType.Function.DST_ADDRESS, 16)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 16)
alice = Participant("Alice", "A", "1234", color_index=0)
bob = Participant("Bob", "B", "5a9d", color_index=1)
num_messages = 100
pg = ProtocolGenerator([mb.message_type], syncs_by_mt={mb.message_type: "0x1c"}, little_endian=False)
for i in range(num_messages):
len_data = random.randint(1, 5)
data = "".join(pg.decimal_to_bits(random.randint(0, 2 ** 8 - 1), 8) for _ in range(len_data))
if i % 2 == 0:
source, dest = alice, bob
else:
source, dest = bob, alice
def __init_field_types():
result = []
for field_type_function in FieldType.Function:
result.append(FieldType(field_type_function.value, field_type_function))
return result
def test_crc_widget_in_protocol_label_dialog(self):
mt = MessageType("test")
mt.append(ChecksumLabel("test_crc", 8, 16, 0, FieldType("test_crc", FieldType.Function.CHECKSUM)))
self.dialog = ProtocolLabelController(0, Message([0] * 100, 0, mt), 0)
self.assertEqual(self.dialog.ui.tabWidgetAdvancedSettings.count(), 1)
self.assertEqual(self.dialog.ui.tabWidgetAdvancedSettings.tabText(0), "test_crc")
preamble = message_types[0].get_first_label_with_type(FieldType.Function.PREAMBLE)
self.assertEqual(preamble.start, 0)
self.assertEqual(preamble.length, 8)
sync = message_types[0].get_first_label_with_type(FieldType.Function.SYNC)
self.assertEqual(sync.start, 8)
self.assertEqual(sync.length, 4)
checksum = message_types[0].get_first_label_with_type(FieldType.Function.CHECKSUM)
self.assertEqual(checksum.start, 56)
self.assertEqual(checksum.length, 4)
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.SRC_ADDRESS))
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.DST_ADDRESS))
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.LENGTH))
self.assertIsNone(message_types[0].get_first_label_with_type(FieldType.Function.SEQUENCE_NUMBER))
Perform a field inference iteration for messages of the given message type
This routine will return newly found fields as a set of Common Ranges
:param message_type:
:rtype: set of CommonRange
"""
indices = self.existing_message_types[message_type]
engines = []
# We can take an arbitrary sync end to correct the already labeled fields for this message type,
# because if the existing labels would have different sync positions,
# they would not belong to the same message type in the first place
sync_end = self.sync_ends[indices[0]] if indices else 0
already_labeled = [(lbl.start - sync_end, lbl.end - sync_end) for lbl in message_type if lbl.start >= sync_end]
if not message_type.get_first_label_with_type(FieldType.Function.LENGTH):
engines.append(LengthEngine([self.bitvectors[i] for i in indices], already_labeled=already_labeled))
if not message_type.get_first_label_with_type(FieldType.Function.SRC_ADDRESS):
engines.append(AddressEngine([self.hexvectors[i] for i in indices],
[self.participant_indices[i] for i in indices],
self.known_participant_addresses,
already_labeled=already_labeled))
elif not message_type.get_first_label_with_type(FieldType.Function.DST_ADDRESS):
engines.append(AddressEngine([self.hexvectors[i] for i in indices],
[self.participant_indices[i] for i in indices],
self.known_participant_addresses,
already_labeled=already_labeled,
src_field_present=True))
if not message_type.get_first_label_with_type(FieldType.Function.SEQUENCE_NUMBER):
engines.append(SequenceNumberEngine([self.bitvectors[i] for i in indices], already_labeled=already_labeled))
def __init__(self, name: str, start: int, end: int, color_index: int, field_type: FieldType,
fuzz_created=False, auto_created=False, data_range_start=0):
assert field_type.function == FieldType.Function.CHECKSUM
super().__init__(name, start, end, color_index, fuzz_created, auto_created, field_type)
self.__category = self.Category.generic
self.__data_ranges = [[data_range_start, self.start]] # type: list[list[int,int]]
self.checksum = GenericCRC(polynomial=0) # type: GenericCRC or WSPChecksum
def from_xml(tag: ET.Element):
field_types_by_caption = {ft.caption: ft for ft in FieldType.load_from_xml()}
name = tag.get("name", "blank")
id = tag.get("id", None)
assigned_by_ruleset = bool(int(tag.get("assigned_by_ruleset", 0)))
assigned_by_logic_analyzer = bool(int(tag.get("assigned_by_logic_analyzer", 0)))
labels = []
for lbl_tag in tag.findall("label"):
labels.append(ProtocolLabel.from_xml(lbl_tag, field_types_by_caption=field_types_by_caption))
for lbl_tag in tag.findall("checksum_label"):
labels.append(ChecksumLabel.from_xml(lbl_tag, field_types_by_caption=field_types_by_caption))
result = MessageType(name=name, iterable=labels, id=id, ruleset=Ruleset.from_xml(tag.find("ruleset")))
result.assigned_by_ruleset = assigned_by_ruleset
result.assigned_by_logic_analyzer = assigned_by_logic_analyzer
return result
completer = QCompleter()
completer.setModel(QDirModel(completer))
self.ui.lineEditPython2Interpreter.setCompleter(completer)
self.ui.lineEditGnuradioDirectory.setCompleter(completer)
self.refresh_device_tab()
self.create_connects()
self.old_show_pause_as_time = False
self.field_type_table_model = FieldTypeTableModel([], parent=self)
self.ui.tblLabeltypes.setModel(self.field_type_table_model)
self.ui.tblLabeltypes.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
self.ui.tblLabeltypes.setItemDelegateForColumn(1, ComboBoxDelegate([f.name for f in FieldType.Function],
return_index=False, parent=self))
self.ui.tblLabeltypes.setItemDelegateForColumn(2, ComboBoxDelegate(ProtocolLabel.DISPLAY_FORMATS, parent=self))
self.read_options()
self.old_default_view = self.ui.comboBoxDefaultView.currentIndex()
self.old_num_sending_repeats = self.ui.spinBoxNumSendingRepeats.value()
self.ui.labelRebuildNativeStatus.setText("")
self.show_available_colormaps()
try:
self.restoreGeometry(constants.SETTINGS.value("{}/geometry".format(self.__class__.__name__)))
except TypeError:
pass
msg.participant = source
self.sequence_numbers[mt] += self.sequence_number_increment
for checksum_label in checksum_labels:
msg[checksum_label.start:checksum_label.end] = checksum_label.calculate_checksum_for_message(msg, False)
self.protocol.messages.append(msg)
def to_file(self, filename: str):
self.protocol.to_xml_file(filename, [], self.participants, write_bits=True)
if __name__ == '__main__':
mb = MessageTypeBuilder("test")
mb.add_label(FieldType.Function.PREAMBLE, 8)
mb.add_label(FieldType.Function.SYNC, 4)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 16)
mb.add_label(FieldType.Function.SRC_ADDRESS, 16)
mb.add_label(FieldType.Function.DST_ADDRESS, 16)
pg = ProtocolGenerator([mb.message_type], [], little_endian=False)
pg.generate_message(data="1" * 8)
pg.generate_message(data="1" * 16)
pg.generate_message(data="0xab", source=Participant("Alice", "A", "1234"),
destination=Participant("Bob", "B", "4567"))
pg.to_file("/tmp/test.proto")