Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for i in range(num_messages):
if i % 4 == 0:
data = "1" * data_length
elif i % 4 == 1:
data = "0" * data_length
elif i % 4 == 2:
data = "10" * (data_length // 2)
else:
data = "01" * (data_length // 2)
pg.generate_message(data=data)
self.save_protocol("easy_length", pg)
self.clear_message_types(pg.protocol.messages)
ff = FormatFinder(pg.protocol.messages)
length_engine = LengthEngine(ff.bitvectors)
highscored_ranges = length_engine.find(n_gram_length=8)
self.assertEqual(len(highscored_ranges), 4)
ff.perform_iteration()
self.assertEqual(len(ff.message_types), 1)
self.assertGreater(len(ff.message_types[0]), 0)
print(ff.message_types[0])
label = ff.message_types[0].get_first_label_with_type(FieldType.Function.LENGTH)
self.assertIsInstance(label, ProtocolLabel)
self.assertEqual(label.start, 32)
self.assertEqual(label.length, 8)
def test_homematic(self):
proto_file = get_path_for_data_file("homematic.proto.xml")
protocol = ProtocolAnalyzer(signal=None, filename=proto_file)
protocol.message_types = []
protocol.from_xml_file(filename=proto_file, read_bits=True)
# prevent interfering with preassinged labels
protocol.message_types = [MessageType("Default")]
participants = sorted({msg.participant for msg in protocol.messages})
self.clear_message_types(protocol.messages)
ff = FormatFinder(protocol.messages, participants=participants)
ff.known_participant_addresses.clear()
ff.perform_iteration()
self.assertGreater(len(ff.message_types), 0)
for i, message_type in enumerate(ff.message_types):
preamble = message_type.get_first_label_with_type(FieldType.Function.PREAMBLE)
self.assertEqual(preamble.start, 0)
self.assertEqual(preamble.length, 32)
sync = message_type.get_first_label_with_type(FieldType.Function.SYNC)
self.assertEqual(sync.start, 32)
self.assertEqual(sync.length, 32)
length = message_type.get_first_label_with_type(FieldType.Function.LENGTH)
self.assertEqual(length.start, 64)
def test_format_finding_enocean(self):
enocean_protocol = ProtocolAnalyzer(None)
with open(get_path_for_data_file("enocean_bits.txt")) as f:
for line in f:
enocean_protocol.messages.append(Message.from_plain_bits_str(line.replace("\n", "")))
enocean_protocol.messages[-1].message_type = enocean_protocol.default_message_type
ff = FormatFinder(enocean_protocol.messages)
ff.perform_iteration()
message_types = ff.message_types
self.assertEqual(len(message_types), 1)
preamble = message_types[0].get_first_label_with_type(FieldType.Function.PREAMBLE)
self.assertEqual(preamble.start, 0)
self.assertEqual(preamble.length, 8)
sync = message_types[0].get_first_label_with_type(FieldType.Function.SYNC)
self.assertEqual(sync.start, 8)
self.assertEqual(sync.length, 4)
checksum = message_types[0].get_first_label_with_type(FieldType.Function.CHECKSUM)
self.assertEqual(checksum.start, 56)
self.assertEqual(checksum.length, 4)
def test_type_part_already_labeled(self):
protocol = self.__prepare_simple_example_protocol()
self.clear_message_types(protocol.messages)
ff = FormatFinder(protocol.messages)
# overlaps type
ff.message_types[0].add_protocol_label_start_length(32, 8)
ff.perform_iteration()
self.assertEqual(1, len(ff.message_types))
self.assertIsNotNone(ff.message_types[0].get_first_label_with_type(FieldType.Function.PREAMBLE))
self.assertIsNotNone(ff.message_types[0].get_first_label_with_type(FieldType.Function.SYNC))
self.assertIsNotNone(ff.message_types[0].get_first_label_with_type(FieldType.Function.LENGTH))
self.assertIsNotNone(ff.message_types[0].get_first_label_with_type(FieldType.Function.DST_ADDRESS))
self.assertIsNotNone(ff.message_types[0].get_first_label_with_type(FieldType.Function.SRC_ADDRESS))
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x9a9d"}, sequence_number_increment=64)
for i in range(num_messages):
pg.generate_message(data="0xcafe")
self.save_protocol("16bit_seq", pg)
bitvectors = FormatFinder.get_bitvectors_from_messages(pg.protocol.messages, sync_ends=[24]*num_messages)
seq_engine = SequenceNumberEngine(bitvectors, n_gram_length=8)
highscored_ranges = seq_engine.find()
self.assertEqual(len(highscored_ranges), 1)
self.clear_message_types(pg.protocol.messages)
ff = FormatFinder(pg.protocol.messages)
ff.perform_iteration()
self.assertEqual(len(ff.message_types), 1)
self.assertGreater(len(ff.message_types[0]), 0)
self.assertEqual(ff.message_types[0].num_labels_with_type(FieldType.Function.SEQUENCE_NUMBER), 1)
label = ff.message_types[0].get_first_label_with_type(FieldType.Function.SEQUENCE_NUMBER)
self.assertEqual(label.start, 24)
self.assertEqual(label.length, 16)
def run_format_finder_for_protocol(protocol: ProtocolAnalyzer):
ff = FormatFinder(protocol.messages)
ff.known_participant_addresses.clear()
ff.run()
for msg_type, indices in ff.existing_message_types.items():
for i in indices:
protocol.messages[i].message_type = msg_type
def test_create_message_types_1(self):
rng1 = CommonRange(0, 8, "1" * 8, score=1, field_type="Length")
rng1.message_indices = {0, 1, 2}
rng2 = CommonRange(8, 8, "1" * 8, score=1, field_type="Address")
rng2.message_indices = {0, 1, 2}
message_types = FormatFinder.create_common_range_containers({rng1, rng2})
self.assertEqual(len(message_types), 1)
expected = CommonRangeContainer([rng1, rng2], message_indices={0, 1, 2})
self.assertEqual(message_types[0], expected)
def test_length_part_already_labeled(self):
protocol = self.__prepare_simple_example_protocol()
self.clear_message_types(protocol.messages)
ff = FormatFinder(protocol.messages)
# overlaps length
ff.message_types[0].add_protocol_label_start_length(24, 8)
ff.perform_iteration()
self.assertEqual(1, len(ff.message_types))
self.assertIsNotNone(ff.message_types[0].get_first_label_with_type(FieldType.Function.PREAMBLE))
self.assertIsNotNone(ff.message_types[0].get_first_label_with_type(FieldType.Function.SYNC))
self.assertIsNone(ff.message_types[0].get_first_label_with_type(FieldType.Function.LENGTH))
self.assertIsNotNone(ff.message_types[0].get_first_label_with_type(FieldType.Function.DST_ADDRESS))
self.assertIsNotNone(ff.message_types[0].get_first_label_with_type(FieldType.Function.SRC_ADDRESS))
syncs_by_mt={mb.message_type: "0x1337"},
participants=[alice, bob])
for i in range(num_messages):
if i % 2 == 0:
source, destination = alice, bob
else:
source, destination = bob, alice
pg.generate_message(data="", source=source, destination=destination)
self.save_protocol("protocol_1", pg)
# Delete message type information -> no prior knowledge
self.clear_message_types(pg.protocol.messages)
ff = FormatFinder(pg.protocol.messages)
ff.known_participant_addresses.clear()
ff.perform_iteration()
self.assertEqual(len(ff.message_types), 1)
self.assertEqual(ff.message_types[0].num_labels_with_type(FieldType.Function.SEQUENCE_NUMBER), 0)
def auto_assign_labels(self):
from urh.awre.FormatFinder import FormatFinder
format_finder = FormatFinder(self.messages)
format_finder.run(max_iterations=10)
self.message_types[:] = format_finder.message_types
for msg_type, indices in format_finder.existing_message_types.items():
for i in indices:
self.messages[i].message_type = msg_type