Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_without_preamble(self):
alice = Participant("Alice", address_hex="24")
broadcast = Participant("Broadcast", address_hex="ff")
mb = MessageTypeBuilder("data")
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.LENGTH, 8)
mb.add_label(FieldType.Function.SRC_ADDRESS, 8)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x8e88"},
preambles_by_mt={mb.message_type: "10" * 8},
participants=[alice, broadcast])
for i in range(20):
data_bits = 16 if i % 2 == 0 else 32
source = pg.participants[i % 2]
destination = pg.participants[(i + 1) % 2]
pg.generate_message(data="1010" * (data_bits // 4), source=source, destination=destination)
self.save_protocol("without_preamble", pg)
def test_simple_protocol(self):
"""
Test a simple protocol with
preamble, sync and increasing sequence number (8 bit) and some constant data
:return:
"""
mb = MessageTypeBuilder("simple_seq_test")
mb.add_label(FieldType.Function.PREAMBLE, 8)
mb.add_label(FieldType.Function.SYNC, 16)
mb.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
num_messages = 20
pg = ProtocolGenerator([mb.message_type],
syncs_by_mt={mb.message_type: "0x9a9d"})
for i in range(num_messages):
pg.generate_message(data="0xcafe")
self.save_protocol("simple_sequence_number", pg)
self.clear_message_types(pg.protocol.messages)
ff = FormatFinder(pg.protocol.messages)
def generate_homematic(self, num_messages: int, save_protocol=True):
mb_m_frame = MessageTypeBuilder("mframe")
mb_c_frame = MessageTypeBuilder("cframe")
mb_r_frame = MessageTypeBuilder("rframe")
mb_a_frame = MessageTypeBuilder("aframe")
participants = [Participant("CCU", address_hex="3927cc"), Participant("Switch", address_hex="3101cc")]
checksum = GenericCRC.from_standard_checksum("CRC16 CC1101")
for mb_builder in [mb_m_frame, mb_c_frame, mb_r_frame, mb_a_frame]:
mb_builder.add_label(FieldType.Function.PREAMBLE, 32)
mb_builder.add_label(FieldType.Function.SYNC, 32)
mb_builder.add_label(FieldType.Function.LENGTH, 8)
mb_builder.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
mb_builder.add_label(FieldType.Function.TYPE, 16)
mb_builder.add_label(FieldType.Function.SRC_ADDRESS, 24)
mb_builder.add_label(FieldType.Function.DST_ADDRESS, 24)
if mb_builder.name == "mframe":
mb_builder.add_label(FieldType.Function.DATA, 16, name="command")
elif mb_builder.name == "cframe":
mb_builder.add_label(FieldType.Function.DATA, 16 * 4, name="command+challenge+magic")
elif mb_builder.name == "rframe":
mb_builder.add_label(FieldType.Function.DATA, 32 * 4, name="cipher")
elif mb_builder.name == "aframe":
mb_builder.add_label(FieldType.Function.DATA, 10 * 4, name="command + auth")
mb_builder.add_checksum_label(16, checksum)
message_types = [mb_m_frame.message_type, mb_c_frame.message_type, mb_r_frame.message_type,
mb_a_frame.message_type]
preamble = "0xaaaaaaaa"
sync = "0xe9cae9ca"
initial_sequence_number = 36
def generate_homematic(self, num_messages: int, save_protocol=True):
mb_m_frame = MessageTypeBuilder("mframe")
mb_c_frame = MessageTypeBuilder("cframe")
mb_r_frame = MessageTypeBuilder("rframe")
mb_a_frame = MessageTypeBuilder("aframe")
participants = [Participant("CCU", address_hex="3927cc"), Participant("Switch", address_hex="3101cc")]
checksum = GenericCRC.from_standard_checksum("CRC16 CC1101")
for mb_builder in [mb_m_frame, mb_c_frame, mb_r_frame, mb_a_frame]:
mb_builder.add_label(FieldType.Function.PREAMBLE, 32)
mb_builder.add_label(FieldType.Function.SYNC, 32)
mb_builder.add_label(FieldType.Function.LENGTH, 8)
mb_builder.add_label(FieldType.Function.SEQUENCE_NUMBER, 8)
mb_builder.add_label(FieldType.Function.TYPE, 16)
mb_builder.add_label(FieldType.Function.SRC_ADDRESS, 24)
mb_builder.add_label(FieldType.Function.DST_ADDRESS, 24)
if mb_builder.name == "mframe":
mb_builder.add_label(FieldType.Function.DATA, 16, name="command")
elif mb_builder.name == "cframe":
mb_builder.add_label(FieldType.Function.DATA, 16 * 4, name="command+challenge+magic")
elif mb_builder.name == "rframe":
mb_builder.add_label(FieldType.Function.DATA, 32 * 4, name="cipher")
elif mb_builder.name == "aframe":
mb_builder.add_label(FieldType.Function.DATA, 10 * 4, name="command + auth")
mb_builder.add_checksum_label(16, checksum)
message_types = [mb_m_frame.message_type, mb_c_frame.message_type, mb_r_frame.message_type,
mb_a_frame.message_type]
self.assertTrue(all(ack_msg in ff.existing_message_types[ack_message_type] for ack_msg in ack_messages))
for mt in ff.message_types:
preamble = mt.get_first_label_with_type(FieldType.Function.PREAMBLE)
self.assertEqual(preamble.start, 0)
self.assertEqual(preamble.length, 32)
sync = mt.get_first_label_with_type(FieldType.Function.SYNC)
self.assertEqual(sync.start, 32)
self.assertEqual(sync.length, 32)
length = mt.get_first_label_with_type(FieldType.Function.LENGTH)
self.assertEqual(length.start, 64)
self.assertEqual(length.length, 8)
dst = mt.get_first_label_with_type(FieldType.Function.DST_ADDRESS)
self.assertEqual(dst.length, 24)
if mt == ack_message_type or 1 in ff.existing_message_types[mt]:
self.assertEqual(dst.start, 72)
else:
self.assertEqual(dst.start, 88)
if mt != ack_message_type and 1 not in ff.existing_message_types[mt]:
src = mt.get_first_label_with_type(FieldType.Function.SRC_ADDRESS)
self.assertEqual(src.start, 112)
self.assertEqual(src.length, 24)
crc = mt.get_first_label_with_type(FieldType.Function.CHECKSUM)
self.assertIsNotNone(crc)
matches.append(candidate)
if not matches:
candidate = None
break
else:
candidate = Interval.find_greatest(matches)
if candidate:
common_intervals_by_type[message_type].append(candidate)
# Now we have the common intervals and need to check which one is the length
for message_type, intervals in common_intervals_by_type.items():
assert isinstance(message_type, MessageType)
# Exclude Synchronization (or preamble if not present) from length calculation
sync_lbl = self.find_lbl_function_in(FieldType.Function.SYNC, message_type)
if sync_lbl:
sync_len = self.__nbits2bytes(sync_lbl.end)
else:
preamble_lbl = self.find_lbl_function_in(FieldType.Function.PREAMBLE, message_type)
sync_len = self.__nbits2bytes(preamble_lbl.end) if preamble_lbl is not None else 0
scores = defaultdict(int)
weights = {-4: 1, -3: 2, -2: 3, -1: 4, 0: 5}
for common_interval in intervals:
for msg in messages_by_type[message_type]:
bits = msg.decoded_bits
byte_len = self.__nbits2bytes(len(bits)) - sync_len
start, end = common_interval.start, common_interval.end
for byte_start in range(start, end, 8):
byte_end = byte_start + 8 if byte_start + 8 <= end else end
def field_type(self, val: FieldType):
if val is None:
return
if self.is_checksum_label and val.function != FieldType.Function.CHECKSUM:
assert isinstance(self.label, ChecksumLabel)
self.label = self.label.to_label(val)
elif not self.is_checksum_label and val.function == FieldType.Function.CHECKSUM:
self.label = ChecksumLabel.from_label(self.label)
self.value_type_index = 0
self.label.field_type = val
def is_preamble(self) -> bool:
return self.field_type is not None and self.field_type.function == FieldType.Function.PREAMBLE
from urh.signalprocessing.Message import Message
from urh.signalprocessing.MessageType import MessageType
from urh.signalprocessing.ProtocoLabel import ProtocolLabel
from urh.simulator.SimulatorProtocolLabel import SimulatorProtocolLabel
from urh.ui.delegates.CheckBoxDelegate import CheckBoxDelegate
from urh.ui.delegates.ComboBoxDelegate import ComboBoxDelegate
from urh.ui.delegates.SpinBoxDelegate import SpinBoxDelegate
from urh.ui.ui_properties_dialog import Ui_DialogLabels
from urh.util import util
from urh.util.Logger import logger
class ProtocolLabelDialog(QDialog):
apply_decoding_changed = pyqtSignal(ProtocolLabel, MessageType)
SPECIAL_CONFIG_TYPES = [FieldType.Function.CHECKSUM]
def __init__(self, message: Message, viewtype: int, selected_index=None, parent=None):
super().__init__(parent)
self.ui = Ui_DialogLabels()
self.ui.setupUi(self)
util.set_splitter_stylesheet(self.ui.splitter)
field_types = FieldType.load_from_xml()
self.model = PLabelTableModel(message, field_types)
self.ui.tblViewProtoLabels.setItemDelegateForColumn(0, ComboBoxDelegate([ft.caption for ft in field_types],
is_editable=True,
return_index=False, parent=self))
self.ui.tblViewProtoLabels.setItemDelegateForColumn(1, SpinBoxDelegate(1, len(message), self))
self.ui.tblViewProtoLabels.setItemDelegateForColumn(2, SpinBoxDelegate(1, len(message), self))
self.ui.tblViewProtoLabels.setItemDelegateForColumn(3,
def get_raw_preamble_positions(self) -> np.ndarray:
"""
Return a 2D numpy array where first column is the start of preamble
second and third columns are lower and upper bound for preamble length by message, respectively
"""
result = np.zeros((len(self.bitvectors), 3), dtype=int)
for i, bitvector in enumerate(self.bitvectors):
if i in self.existing_message_types:
preamble_label = self.existing_message_types[i].get_first_label_with_type(FieldType.Function.PREAMBLE)
else:
preamble_label = None
if preamble_label is None:
start, lower, upper = self.get_raw_preamble_position(bitvector)
else:
# If this message is already labeled with a preamble we just use it's values
start, lower, upper = preamble_label.start, preamble_label.end, preamble_label.end
result[i, 0] = start
result[i, 1] = lower - start
result[i, 2] = upper - start
return result