Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_4_fsk(self):
bits = array.array("B", [1, 0, 1, 0, 1, 1, 0, 0, 0, 1])
parameters = array.array("f", [-20e3, -10e3, 10e3, 20e3])
result = modulate_c(bits, 100, "FSK", parameters, 2, 1, 40e3, 0, 1e6, 1000, 0, parameters[0])
signal = Signal("")
signal.iq_array = IQArray(result)
signal.bits_per_symbol = 2
signal.center = 0
signal.center_spacing = 0.1
proto_analyzer = ProtocolAnalyzer(signal)
proto_analyzer.get_protocol_from_signal()
self.assertEqual(proto_analyzer.plain_bits_str[0], "1010110001")
def test_brennenstuhl(self):
path = self.get_path("brennenstuhl_signal_ABCD_onoff.coco")
if not path:
return
data = Signal(path, "").iq_array
result = AutoInterpretation.estimate(data)
mod_type, bit_length = result["modulation_type"], result["bit_length"]
center, noise, tolerance = result["center"], result["noise"], result["tolerance"]
self.assertEqual(mod_type, "ASK")
self.assertEqual(bit_length, 300)
print("noise", noise, "center", center, "bit length", bit_length, "tolerance", tolerance)
demodulated = demodulate(data, mod_type, bit_length, center, noise, tolerance, pause_threshold=8)
print(demodulated)
self.assertEqual(len(demodulated), 64)
for i in range(64):
self.assertTrue(demodulated[i].startswith("88888888888"))
self.assertEqual(len(demodulated[i]), len(demodulated[0]))
def test_auto_detect_fsk(self):
signal = Signal(get_path_for_data_file("fsk.complex"), "FSK")
signal.modulation_type = 1
signal.qad_center = signal.estimate_qad_center()
self.assertTrue(-0.1 <= signal.qad_center <= 0)
signal.bit_len = signal.estimate_bitlen()
self.assertTrue(90 <= signal.bit_len <= 110)
modulator.modulation_type = modulation
modulator.samples_per_symbol = self.samples_per_symbol
if modulation == "ASK":
modulator.parameters[0] = 0
modulator.parameters[1] = 100
elif modulation == "FSK":
modulator.parameters[0] = 1000
modulator.parameters[1] = 2500
elif modulation == "PSK":
modulator.parameters[0] = -90
modulator.parameters[1] = 90
modulator.modulate(self.modulation_data, self.pause).tofile(filename)
signal = Signal(filename, modulation)
signal.modulation_type = modulation
signal.samples_per_symbol = self.samples_per_symbol
if modulation == "ASK":
signal.center = 0.5
elif modulation == "FSK":
signal.center = 0.0097
elif modulation == "PSK":
signal.center = 0
self.assertEqual(signal.num_samples, self.total_samples, msg=modulation)
pa = ProtocolAnalyzer(signal)
pa.get_protocol_from_signal()
self.assertEqual(1, len(pa.messages), msg=modulation)
self.assertEqual(self.modulation_data, pa.messages[0].plain_bits, msg=modulation)
def test_freq_detection(self):
s = Signal(get_path_for_data_file("steckdose_anlernen.complex"), "RWE")
s.noise_threshold = 0.06
s.qad_center = 0
s.bit_len = 100
pa = ProtocolAnalyzer(s)
pa.get_protocol_from_signal()
self.assertEqual(pa.messages[0].plain_bits_str,
"101010101010101010101010101010101001101001111101100110100111110111010010011000010110110101111"
"010111011011000011000101000010001001101100101111010110100110011100100110000101001110100001111"
"111101000111001110000101110100100111010110110100001101101101010100011011010001010110011100011"
"010100010101111110011010011001000000110010011010001000100100100111101110110010011111011100010"
"10110010100011111101110111000010111100111101001011101101011011010110101011100")
start, nsamples = pa.get_samplepos_of_bitseq(0, 0, 0, 1, False)
freq = s.estimate_frequency(start, start + nsamples, 1e6)
self.assertEqual(freq, 10000) # Freq for 1 is 10K
total_data = []
while True:
try:
data = connection.recv(65536)
if data:
total_data.append(data)
else:
break
except socket.timeout:
break
if len(total_data) == 0:
logger.error("Did not receive any data from socket.")
arr = IQArray(np.array(np.frombuffer(b"".join(total_data), dtype=np.complex64)))
signal = Signal("", "")
signal.iq_array = arr
pa = ProtocolAnalyzer(signal)
pa.get_protocol_from_signal()
return pa.plain_bits_str
def test_ask_50_center_detection(self):
message_indices = [(0, 8000), (18000, 26000), (36000, 44000), (54000, 62000), (72000, 80000)]
data = Signal(get_path_for_data_file("ask50.complex")).iq_array.data
rect = afp_demod(data, 0.0509, "ASK")
for start, end in message_indices:
center = detect_center(rect[start:end])
self.assertGreaterEqual(center, 0.4, msg="{}/{}".format(start, end))
self.assertLessEqual(center, 0.65, msg="{}/{}".format(start, end))
def test_auto_detect_elektromaten(self):
signal = Signal(get_path_for_data_file("elektromaten.complex"), "Elektromaten")
signal.modulation_type = 0
signal.qad_center = signal.estimate_qad_center()
self.assertTrue(0.0387 < signal.qad_center < 0.1183)
signal.bit_len = signal.estimate_bitlen()
self.assertTrue(270 <= signal.bit_len <= 330)
@pyqtSlot(int, Signal)
def on_signal_created(self, index: int, signal: Signal):
self.add_signal(signal, index=index)
from PyQt5.QtCore import QPoint, pyqtSignal, pyqtSlot
from PyQt5.QtWidgets import QWidget, QSizePolicy, QUndoStack, QCheckBox, QMessageBox
from urh import constants
from urh.controller.widgets.SignalFrame import SignalFrame
from urh.signalprocessing.Signal import Signal
from urh.ui.ui_tab_interpretation import Ui_Interpretation
from urh.util import util
class SignalTabController(QWidget):
frame_closed = pyqtSignal(SignalFrame)
not_show_again_changed = pyqtSignal()
signal_created = pyqtSignal(int, Signal)
files_dropped = pyqtSignal(list)
frame_was_dropped = pyqtSignal(int, int)
@property
def num_frames(self):
return len(self.signal_frames)
@property
def signal_frames(self):
"""
:rtype: list of SignalFrame
"""
splitter = self.ui.splitter
return [splitter.widget(i) for i in range(splitter.count())
if isinstance(splitter.widget(i), SignalFrame)]