Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_without_preamble(self):
alice = Participant("Alice", address_hex="24")
broadcast = Participant("Broadcast", address_hex="ff")
mb = MessageTypeBuilder("data")
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 8)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x8e88"},
preambles_by_mt={mb.message_type: "10" * 8},
participants=[alice, broadcast])
for i in range(20):
data_bits = 16 if i % 2 == 0 else 32
source = pg.participants[i % 2]
destination = pg.participants[(i + 1) % 2]
pg.generate_message(data="1010" * (data_bits // 4), source=source, destination=destination)
def _prepare_protocol_5() -> ProtocolGenerator:
alice = Participant("Alice", address_hex="1337")
bob = Participant("Bob", address_hex="beef")
carl = Participant("Carl", address_hex="cafe")
mb = MessageTypeBuilder("data")
mb.add_label(FieldType.Function.PREAMBLE, 16)
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 16)
mb.add_label(FieldType.Function.DST_ADDRESS, 16)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 16)
mb_ack = MessageTypeBuilder("ack")
mb_ack.add_label(FieldType.Function.PREAMBLE, 16)
mb_ack.add_label(FieldType.Function.SYNC, 16)
mb_ack.add_label(FieldType.Function.LENGTH, 8)
mb_ack.add_label(FieldType.Function.DST_ADDRESS, 16)
pg = ProtocolGenerator([mb.message_type, mb_ack.message_type],
syncs_by_mt={mb.message_type: "0x9a7d", mb_ack.message_type: "0x9a7d"},
preambles_by_mt={mb.message_type: "10" * 8, mb_ack.message_type: "10" * 8},
participants=[alice, bob, carl])
return pg
def test_medium_protocol(self):
"""
Protocol with two message types. Length field only present in one of them
:return:
"""
mb1 = MessageTypeBuilder("data")
mb1.add_label(FieldType.Function.PREAMBLE, 8)
mb1.add_label(FieldType.Function.SYNC, 8)
mb1.add_label(FieldType.Function.LENGTH, 8)
mb1.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
mb2 = MessageTypeBuilder("ack")
mb2.add_label(FieldType.Function.PREAMBLE, 8)
mb2.add_label(FieldType.Function.SYNC, 8)
pg = ProtocolGenerator([mb1.message_type, mb2.message_type],
syncs_by_mt={mb1.message_type: "11110011",
mb2.message_type: "11110011"})
num_messages_by_data_length = {8: 5, 16: 10, 32: 5}
for data_length, num_messages in num_messages_by_data_length.items():
for i in range(num_messages):
pg.generate_message(data=pg.decimal_to_bits(10 * i, data_length), message_type=mb1.message_type)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 16)
mb.add_label(FieldType.Function.DST_ADDRESS, 16)
mb.add_label(FieldType.Function.DATA, 8 * 8)
mb.add_checksum_label(16, checksum)
mb2 = MessageTypeBuilder("data2")
mb2.add_label(FieldType.Function.PREAMBLE, 16)
mb2.add_label(FieldType.Function.SYNC, 16)
mb2.add_label(FieldType.Function.LENGTH, 8)
mb2.add_label(FieldType.Function.SRC_ADDRESS, 16)
mb2.add_label(FieldType.Function.DST_ADDRESS, 16)
mb2.add_label(FieldType.Function.DATA, 64 * 8)
mb2.add_checksum_label(16, checksum)
mb_ack = MessageTypeBuilder("ack")
mb_ack.add_label(FieldType.Function.PREAMBLE, 16)
mb_ack.add_label(FieldType.Function.SYNC, 16)
mb_ack.add_label(FieldType.Function.LENGTH, 8)
mb_ack.add_label(FieldType.Function.DST_ADDRESS, 16)
mb_ack.add_checksum_label(16, checksum)
mt1, mt2, mt3 = mb.message_type, mb2.message_type, mb_ack.message_type
pg = ProtocolGenerator([mt1, mt2, mt3],
syncs_by_mt={mt1: "0x9a7d", mt2: "0x9a7d", mt3: "0x9a7d"},
preambles_by_mt={mt1: "10" * 8, mt2: "10" * 8, mt3: "10" * 8},
participants=[alice, bob])
return pg
def test_two_participants(self):
mb = MessageTypeBuilder("address_two_participants")
mb.add_label(FieldType.Function.PREAMBLE, 8)
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 16)
mb.add_label(FieldType.Function.DST_ADDRESS, 16)
num_messages = 50
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x9a9d"},
participants=[self.alice, self.bob])
for i in range(num_messages):
if i % 2 == 0:
source, destination = self.alice, self.bob
data_length = 8
def _prepare_protocol_1() -> ProtocolGenerator:
alice = Participant("Alice", address_hex="dead")
bob = Participant("Bob", address_hex="beef")
mb = MessageTypeBuilder("protocol_with_one_message_type")
mb.add_label(FieldType.Function.PREAMBLE, 8)
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 16)
mb.add_label(FieldType.Function.DST_ADDRESS, 16)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x1337"},
participants=[alice, bob])
return pg
def build_protocol_generator(preamble_syncs: list, num_messages: tuple, data: tuple) -> ProtocolGenerator:
message_types = []
preambles_by_mt = dict()
syncs_by_mt = dict()
assert len(preamble_syncs) == len(num_messages) == len(data)
for i, (preamble, sync_word) in enumerate(preamble_syncs):
assert isinstance(preamble, str)
assert isinstance(sync_word, str)
preamble, sync_word = map(ProtocolGenerator.to_bits, (preamble, sync_word))
mb = MessageTypeBuilder("message type #{0}".format(i))
mb.add_label(FieldType.Function.PREAMBLE, len(preamble))
mb.add_label(FieldType.Function.SYNC, len(sync_word))
message_types.append(mb.message_type)
preambles_by_mt[mb.message_type] = preamble
syncs_by_mt[mb.message_type] = sync_word
pg = ProtocolGenerator(message_types, preambles_by_mt=preambles_by_mt, syncs_by_mt=syncs_by_mt)
for i, msg_type in enumerate(message_types):
for j in range(num_messages[i]):
if callable(data[i]):
msg_data = pg.decimal_to_bits(data[i](j), num_bits=8)
else:
msg_data = data[i]
pg.generate_message(message_type=msg_type, data=msg_data)
def __prepare_example_protocol(self) -> ProtocolAnalyzer:
alice = Participant("Alice", "A", address_hex="1234")
bob = Participant("Bob", "B", address_hex="cafe")
mb = MessageTypeBuilder("data")
mb.add_label(FieldType.Function.PREAMBLE, 8)
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.TYPE, 8)
mb.add_label(FieldType.Function.DST_ADDRESS, 16)
mb.add_label(FieldType.Function.SRC_ADDRESS, 16)
mb_ack = MessageTypeBuilder("ack")
mb_ack.add_label(FieldType.Function.PREAMBLE, 8)
mb_ack.add_label(FieldType.Function.SYNC, 16)
mb_ack.add_label(FieldType.Function.LENGTH, 8)
mb_ack.add_label(FieldType.Function.DST_ADDRESS, 16)
num_messages = 50
pg = ProtocolGenerator([mb.message_type, mb_ack.message_type],
syncs_by_mt={mb.message_type: "0x6768", mb_ack.message_type: "0x6768"},
participants=[alice, bob])
random.seed(0)
for i in range(num_messages):
if i % 2 == 0:
source, destination = alice, bob
data_length = 8
def _prepare_protocol_3() -> ProtocolGenerator:
alice = Participant("Alice", address_hex="1337")
bob = Participant("Bob", address_hex="beef")
mb = MessageTypeBuilder("data")
mb.add_label(FieldType.Function.PREAMBLE, 16)
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 16)
mb.add_label(FieldType.Function.DST_ADDRESS, 16)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 16)
mb_ack = MessageTypeBuilder("ack")
mb_ack.add_label(FieldType.Function.PREAMBLE, 16)
mb_ack.add_label(FieldType.Function.SYNC, 16)
mb_ack.add_label(FieldType.Function.LENGTH, 8)
mb_ack.add_label(FieldType.Function.DST_ADDRESS, 16)
pg = ProtocolGenerator([mb.message_type, mb_ack.message_type],
syncs_by_mt={mb.message_type: "0x9a7d", mb_ack.message_type: "0x9a7d"},
preambles_by_mt={mb.message_type: "10" * 8, mb_ack.message_type: "10" * 8},
msg = Message.from_plain_bits_str("".join(bits))
msg.message_type = mt
msg.participant = source
self.sequence_numbers[mt] += self.sequence_number_increment
for checksum_label in checksum_labels:
msg[checksum_label.start:checksum_label.end] = checksum_label.calculate_checksum_for_message(msg, False)
self.protocol.messages.append(msg)
def to_file(self, filename: str):
self.protocol.to_xml_file(filename, [], self.participants, write_bits=True)
if __name__ == '__main__':
mb = MessageTypeBuilder("test")
mb.add_label(FieldType.Function.PREAMBLE, 8)
mb.add_label(FieldType.Function.SYNC, 4)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 16)
mb.add_label(FieldType.Function.SRC_ADDRESS, 16)
mb.add_label(FieldType.Function.DST_ADDRESS, 16)
pg = ProtocolGenerator([mb.message_type], [], little_endian=False)
pg.generate_message(data="1" * 8)
pg.generate_message(data="1" * 16)
pg.generate_message(data="0xab", source=Participant("Alice", "A", "1234"),
destination=Participant("Bob", "B", "4567"))
pg.to_file("/tmp/test.proto")