Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_without_preamble(self):
alice = Participant("Alice", address_hex="24")
broadcast = Participant("Broadcast", address_hex="ff")
mb = MessageTypeBuilder("data")
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 8)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x8e88"},
preambles_by_mt={mb.message_type: "10" * 8},
participants=[alice, broadcast])
for i in range(20):
data_bits = 16 if i % 2 == 0 else 32
source = pg.participants[i % 2]
destination = pg.participants[(i + 1) % 2]
pg.generate_message(data="1010" * (data_bits // 4), source=source, destination=destination)
self.save_protocol("without_preamble", pg)def test_simple_protocol(self):
"""
Test a simple protocol with
preamble, sync and increasing sequence number (8 bit) and some constant data
:return:
"""
mb = MessageTypeBuilder("simple_seq_test")
mb.add_label(FieldType.Function.PREAMBLE, 8)
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
num_messages = 20
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x9a9d"})
for i in range(num_messages):
pg.generate_message(data="0xcafe")
self.save_protocol("simple_sequence_number", pg)
self.clear_message_types(pg.protocol.messages)
ff = FormatFinder(pg.protocol.messages)def generate_homematic(self, num_messages: int, save_protocol=True):
mb_m_frame = MessageTypeBuilder("mframe")
mb_c_frame = MessageTypeBuilder("cframe")
mb_r_frame = MessageTypeBuilder("rframe")
mb_a_frame = MessageTypeBuilder("aframe")
participants = [Participant("CCU", address_hex="3927cc"), Participant("Switch", address_hex="3101cc")]
checksum = GenericCRC.from_standard_checksum("CRC16 CC1101")
for mb_builder in [mb_m_frame, mb_c_frame, mb_r_frame, mb_a_frame]:
mb_builder.add_label(FieldType.Function.PREAMBLE, 32)
mb_builder.add_label(FieldType.Function.SYNC, 32)
mb_builder.add_label(FieldType.Function.LENGTH, 8)
mb_builder.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
mb_builder.add_label(FieldType.Function.TYPE, 16)
mb_builder.add_label(FieldType.Function.SRC_ADDRESS, 24)
mb_builder.add_label(FieldType.Function.DST_ADDRESS, 24)
if mb_builder.name == "mframe":
mb_builder.add_label(FieldType.Function.DATA, 16, name="command")
elif mb_builder.name == "cframe":
mb_builder.add_label(FieldType.Function.DATA, 16 * 4, name="command+challenge+magic")
elif mb_builder.name == "rframe":
mb_builder.add_label(FieldType.Function.DATA, 32 * 4, name="cipher")
elif mb_builder.name == "aframe":
mb_builder.add_label(FieldType.Function.DATA, 10 * 4, name="command + auth")
mb_builder.add_checksum_label(16, checksum)
message_types = [mb_m_frame.message_type, mb_c_frame.message_type, mb_r_frame.message_type,
mb_a_frame.message_type]
preamble = "0xaaaaaaaa"
sync = "0xe9cae9ca"
initial_sequence_number = 36def generate_homematic(self, num_messages: int, save_protocol=True):
mb_m_frame = MessageTypeBuilder("mframe")
mb_c_frame = MessageTypeBuilder("cframe")
mb_r_frame = MessageTypeBuilder("rframe")
mb_a_frame = MessageTypeBuilder("aframe")
participants = [Participant("CCU", address_hex="3927cc"), Participant("Switch", address_hex="3101cc")]
checksum = GenericCRC.from_standard_checksum("CRC16 CC1101")
for mb_builder in [mb_m_frame, mb_c_frame, mb_r_frame, mb_a_frame]:
mb_builder.add_label(FieldType.Function.PREAMBLE, 32)
mb_builder.add_label(FieldType.Function.SYNC, 32)
mb_builder.add_label(FieldType.Function.LENGTH, 8)
mb_builder.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
mb_builder.add_label(FieldType.Function.TYPE, 16)
mb_builder.add_label(FieldType.Function.SRC_ADDRESS, 24)
mb_builder.add_label(FieldType.Function.DST_ADDRESS, 24)
if mb_builder.name == "mframe":
mb_builder.add_label(FieldType.Function.DATA, 16, name="command")
elif mb_builder.name == "cframe":
mb_builder.add_label(FieldType.Function.DATA, 16 * 4, name="command+challenge+magic")
elif mb_builder.name == "rframe":
mb_builder.add_label(FieldType.Function.DATA, 32 * 4, name="cipher")
elif mb_builder.name == "aframe":
mb_builder.add_label(FieldType.Function.DATA, 10 * 4, name="command + auth")
mb_builder.add_checksum_label(16, checksum)
message_types = [mb_m_frame.message_type, mb_c_frame.message_type, mb_r_frame.message_type,
mb_a_frame.message_type]self.assertTrue(all(ack_msg in ff.existing_message_types[ack_message_type] for ack_msg in ack_messages))
for mt in ff.message_types:
preamble = mt.get_first_label_with_type(FieldType.Function.PREAMBLE)
self.assertEqual(preamble.start, 0)
self.assertEqual(preamble.length, 32)
sync = mt.get_first_label_with_type(FieldType.Function.SYNC)
self.assertEqual(sync.start, 32)
self.assertEqual(sync.length, 32)
length = mt.get_first_label_with_type(FieldType.Function.LENGTH)
self.assertEqual(length.start, 64)
self.assertEqual(length.length, 8)
dst = mt.get_first_label_with_type(FieldType.Function.DST_ADDRESS)
self.assertEqual(dst.length, 24)
if mt == ack_message_type or 1 in ff.existing_message_types[mt]:
self.assertEqual(dst.start, 72)
else:
self.assertEqual(dst.start, 88)
if mt != ack_message_type and 1 not in ff.existing_message_types[mt]:
src = mt.get_first_label_with_type(FieldType.Function.SRC_ADDRESS)
self.assertEqual(src.start, 112)
self.assertEqual(src.length, 24)
crc = mt.get_first_label_with_type(FieldType.Function.CHECKSUM)
self.assertIsNotNone(crc)matches.append(candidate)
if not matches:
candidate = None
break
else:
candidate = Interval.find_greatest(matches)
if candidate:
common_intervals_by_type[message_type].append(candidate)
# Now we have the common intervals and need to check which one is the length
for message_type, intervals in common_intervals_by_type.items():
assert isinstance(message_type, MessageType)
# Exclude Synchronization (or preamble if not present) from length calculation
sync_lbl = self.find_lbl_function_in(FieldType.Function.SYNC, message_type)
if sync_lbl:
sync_len = self.__nbits2bytes(sync_lbl.end)
else:
preamble_lbl = self.find_lbl_function_in(FieldType.Function.PREAMBLE, message_type)
sync_len = self.__nbits2bytes(preamble_lbl.end) if preamble_lbl is not None else 0
scores = defaultdict(int)
weights = {-4: 1, -3: 2, -2: 3, -1: 4, 0: 5}
for common_interval in intervals:
for msg in messages_by_type[message_type]:
bits = msg.decoded_bits
byte_len = self.__nbits2bytes(len(bits)) - sync_len
start, end = common_interval.start, common_interval.end
for byte_start in range(start, end, 8):
byte_end = byte_start + 8 if byte_start + 8 <= end else enddef field_type(self, val: FieldType):
if val is None:
return
if self.is_checksum_label and val.function != FieldType.Function.CHECKSUM:
assert isinstance(self.label, ChecksumLabel)
self.label = self.label.to_label(val)
elif not self.is_checksum_label and val.function == FieldType.Function.CHECKSUM:
self.label = ChecksumLabel.from_label(self.label)
self.value_type_index = 0
self.label.field_type = valdef is_preamble(self) -> bool:
return self.field_type is not None and self.field_type.function == FieldType.Function.PREAMBLEfrom urh.signalprocessing.Message import Message
from urh.signalprocessing.MessageType import MessageType
from urh.signalprocessing.ProtocoLabel import ProtocolLabel
from urh.simulator.SimulatorProtocolLabel import SimulatorProtocolLabel
from urh.ui.delegates.CheckBoxDelegate import CheckBoxDelegate
from urh.ui.delegates.ComboBoxDelegate import ComboBoxDelegate
from urh.ui.delegates.SpinBoxDelegate import SpinBoxDelegate
from urh.ui.ui_properties_dialog import Ui_DialogLabels
from urh.util import util
from urh.util.Logger import logger
class ProtocolLabelDialog(QDialog):
apply_decoding_changed = pyqtSignal(ProtocolLabel, MessageType)
SPECIAL_CONFIG_TYPES = [FieldType.Function.CHECKSUM]
def __init__(self, message: Message, viewtype: int, selected_index=None, parent=None):
super().__init__(parent)
self.ui = Ui_DialogLabels()
self.ui.setupUi(self)
util.set_splitter_stylesheet(self.ui.splitter)
field_types = FieldType.load_from_xml()
self.model = PLabelTableModel(message, field_types)
self.ui.tblViewProtoLabels.setItemDelegateForColumn(0, ComboBoxDelegate([ft.caption for ft in field_types],
is_editable=True,
return_index=False, parent=self))
self.ui.tblViewProtoLabels.setItemDelegateForColumn(1, SpinBoxDelegate(1, len(message), self))
self.ui.tblViewProtoLabels.setItemDelegateForColumn(2, SpinBoxDelegate(1, len(message), self))
self.ui.tblViewProtoLabels.setItemDelegateForColumn(3,def get_raw_preamble_positions(self) -> np.ndarray:
"""
Return a 2D numpy array where first column is the start of preamble
second and third columns are lower and upper bound for preamble length by message, respectively
"""
result = np.zeros((len(self.bitvectors), 3), dtype=int)
for i, bitvector in enumerate(self.bitvectors):
if i in self.existing_message_types:
preamble_label = self.existing_message_types[i].get_first_label_with_type(FieldType.Function.PREAMBLE)
else:
preamble_label = None
if preamble_label is None:
start, lower, upper = self.get_raw_preamble_position(bitvector)
else:
# If this message is already labeled with a preamble we just use it's values
start, lower, upper = preamble_label.start, preamble_label.end, preamble_label.end
result[i, 0] = start
result[i, 1] = lower - start
result[i, 2] = upper - start
return result