Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_paper_example(self):
alice = Participant("Alice", "A")
bob = Participant("Bob", "B")
participants = [alice, bob]
msg1 = Message.from_plain_hex_str("aabb1234")
msg1.participant = alice
msg2 = Message.from_plain_hex_str("aabb6789")
msg2.participant = alice
msg3 = Message.from_plain_hex_str("bbaa4711")
msg3.participant = bob
msg4 = Message.from_plain_hex_str("bbaa1337")
msg4.participant = bob
protocol = ProtocolAnalyzer(None)
protocol.messages.extend([msg1, msg2, msg3, msg4])
self.save_protocol("paper_example", protocol)
bitvectors = FormatFinder.get_bitvectors_from_messages(protocol.messages)
hexvectors = FormatFinder.get_hexvectors(bitvectors)
address_engine = AddressEngine(hexvectors, participant_indices=[participants.index(msg.participant) for msg in
protocol.messages])
def test_crc_widget_in_protocol_label_dialog(self):
mt = MessageType("test")
mt.append(ChecksumLabel("test_crc", 8, 16, 0, FieldType("test_crc", FieldType.Function.CHECKSUM)))
self.dialog = ProtocolLabelController(0, Message([0] * 100, 0, mt), 0)
self.assertEqual(self.dialog.ui.tabWidgetAdvancedSettings.count(), 1)
self.assertEqual(self.dialog.ui.tabWidgetAdvancedSettings.tabText(0), "test_crc")
def __build_protocol(self):
result = ProtocolAnalyzer(signal=None)
for _ in range(self.NUM_MESSAGES):
b = Message([True] * self.BITS_PER_MESSAGE, pause = 1000, message_type=result.default_message_type)
result.messages.append(b)
return result
def setUp(self):
self.field_types = FieldType.default_field_types()
self.preamble_field_type = self.__field_type_with_function(self.field_types, FieldType.Function.PREAMBLE)
self.sync_field_type = self.__field_type_with_function(self.field_types, FieldType.Function.SYNC)
self.length_field_type = self.__field_type_with_function(self.field_types, FieldType.Function.LENGTH)
self.sequence_number_field_type = self.__field_type_with_function(self.field_types,
FieldType.Function.SEQUENCE_NUMBER)
self.dst_address_field_type = self.__field_type_with_function(self.field_types, FieldType.Function.DST_ADDRESS)
self.src_address_field_type = self.__field_type_with_function(self.field_types, FieldType.Function.SRC_ADDRESS)
self.protocol = ProtocolAnalyzer(None)
with open(get_path_for_data_file("awre_consistent_addresses.txt")) as f:
for line in f:
self.protocol.messages.append(Message.from_plain_bits_str(line.replace("\n", "")))
self.protocol.messages[-1].message_type = self.protocol.default_message_type
# Assign participants
alice = Participant("Alice", "A")
bob = Participant("Bob", "B")
alice_indices = {1, 2, 5, 6, 9, 10, 13, 14, 17, 18, 20, 22, 23, 26, 27, 30, 31, 34, 35, 38, 39, 41}
for i, message in enumerate(self.protocol.messages):
message.participant = alice if i in alice_indices else bob
self.participants = [alice, bob]
self.zero_crc_protocol = ProtocolAnalyzer(None)
with open(get_path_for_data_file("awre_zeroed_crc.txt")) as f:
for line in f:
self.zero_crc_protocol.messages.append(Message.from_plain_bits_str(line.replace("\n", "")))
self.zero_crc_protocol.messages[-1].message_type = self.protocol.default_message_type
def __build_protocol(self):
result = ProtocolAnalyzer(signal=None)
for _ in range(self.NUM_MESSAGES):
b = Message([True] * self.BITS_PER_MESSAGE, pause=1000, message_type=result.default_message_type)
result.messages.append(b)
return result
ppseq = signal_functions.grab_pulse_lens(signal.qad, signal.center, signal.tolerance,
signal.modulation_type, signal.samples_per_symbol,
signal.bits_per_symbol, signal.center_spacing)
bit_data, pauses, bit_sample_pos = self._ppseq_to_bits(ppseq, samples_per_symbol, self.signal.bits_per_symbol,
pause_threshold=signal.pause_threshold)
if signal.message_length_divisor > 1 and signal.modulation_type == "ASK":
self.__ensure_message_length_multiple(bit_data, signal.samples_per_symbol, pauses, bit_sample_pos,
signal.message_length_divisor)
i = 0
for bits, pause in zip(bit_data, pauses):
middle_bit_pos = bit_sample_pos[i][int(len(bits) / 2)]
start, end = middle_bit_pos, middle_bit_pos + samples_per_symbol
rssi = np.mean(signal.iq_array.subarray(start, end).magnitudes_normalized)
message = Message(bits, pause, message_type=self.default_message_type,
samples_per_symbol=samples_per_symbol, rssi=rssi, decoder=self.decoder,
bit_sample_pos=bit_sample_pos[i], bits_per_symbol=signal.bits_per_symbol)
self.messages.append(message)
i += 1
self.qt_signals.protocol_updated.emit()
modulation = "MOD_GFSK"
elif modulation == "PSK":
modulation = "MOD_MSK"
else: # Fallback
modulation = "MOD_ASK_OOK"
self.configure_rfcat(modulation=modulation, freq=self.project_manager.device_conf["frequency"],
sample_rate=sample_rates[0], samples_per_symbol=messages[0].samples_per_symbol)
repeats_from_settings = constants.SETTINGS.value('num_sending_repeats', type=int)
repeats = repeats_from_settings if repeats_from_settings > 0 else -1
while (repeats > 0 or repeats == -1) and self.__sending_interrupt_requested == False:
logger.debug("Start iteration ({} left)".format(repeats if repeats > 0 else "infinite"))
for i, msg in enumerate(messages):
if self.__sending_interrupt_requested:
break
assert isinstance(msg, Message)
wait_time = msg.pause / sample_rates[i]
self.current_send_message_changed.emit(i)
error = self.send_data(self.bit_str_to_bytearray(msg.encoded_bits_str))
if not error:
logger.debug("Sent message {0}/{1}".format(i+1, len(messages)))
logger.debug("Waiting message pause: {0:.2f}s".format(wait_time))
if self.__sending_interrupt_requested:
break
time.sleep(wait_time)
else:
self.is_sending = False
Errors.generic_error("Could not connect to {0}:{1}".format(self.client_ip, self.client_port), msg=error)
break
if repeats > 0:
repeats -= 1
self.device.stop("EndlessSender stopped.")
def push_data(self, data: np.ndarray):
self.ringbuffer.push(data)
if __name__ == '__main__':
from urh.dev.BackendHandler import BackendHandler
from urh.signalprocessing.Message import Message
from urh.signalprocessing.MessageType import MessageType
from urh.signalprocessing.Modulator import Modulator
from urh.util.Logger import logger
import time
endless_sender = EndlessSender(BackendHandler(), "HackRF")
msg = Message([1, 0] * 16 + [1, 1, 0, 0] * 8 + [0, 0, 1, 1] * 8 + [1, 0, 1, 1, 1, 0, 0, 1, 1, 1] * 4, 0,
MessageType("empty_message_type"))
modulator = Modulator("test_modulator")
modulator.samples_per_symbol = 1000
modulator.carrier_freq_hz = 55e3
logger.debug("Starting endless sender")
endless_sender.start()
time.sleep(1)
logger.debug("Pushing data")
endless_sender.push_data(modulator.modulate(msg.encoded_bits))
logger.debug("Pushed data")
time.sleep(5)
logger.debug("Stopping endless sender")
endless_sender.stop()
time.sleep(1)
logger.debug("bye")
def from_binary(self, filename: str):
aggregated = np.fromfile(filename, dtype=np.uint8)
unaggregated = [int(b) for n in aggregated for b in "{0:08b}".format(n)]
self.messages.append(Message(unaggregated, 0, self.default_message_type))
def redo(self):
message = copy.deepcopy(self.proto_analyzer.messages[self.msg_nr])
message1 = Message(plain_bits=message.plain_bits[:self.pos], pause=0,
rssi=0, decoder=message.decoder, message_type=message.message_type,
samples_per_symbol=message.samples_per_symbol)
message2 = Message(plain_bits=message.plain_bits[self.pos:], pause=message.pause,
rssi=0, decoder=message.decoder, message_type=message.message_type,
samples_per_symbol=message.samples_per_symbol)
self.proto_analyzer.messages[self.msg_nr] = message1
self.proto_analyzer.messages.insert(self.msg_nr + 1, message2)