Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_rsp_rrv():
rsp90 = nk.rsp_simulate(duration=60, sampling_rate=1000, respiratory_rate=90, random_state=42)
rsp110 = nk.rsp_simulate(duration=60, sampling_rate=1000, respiratory_rate=110, random_state=42)
cleaned90 = nk.rsp_clean(rsp90, sampling_rate=1000)
_, peaks90 = nk.rsp_peaks(cleaned90)
rsp_rate90 = nk.signal_rate(peaks90, desired_length=len(rsp90))
cleaned110 = nk.rsp_clean(rsp110, sampling_rate=1000)
_, peaks110 = nk.rsp_peaks(cleaned110)
rsp_rate110 = nk.signal_rate(peaks110, desired_length=len(rsp110))
rsp90_rrv = nk.rsp_rrv(rsp_rate90, peaks90)
rsp110_rrv = nk.rsp_rrv(rsp_rate110, peaks110)
assert np.array(rsp90_rrv["RRV_SDBB"]) < np.array(rsp110_rrv["RRV_SDBB"])
assert np.array(rsp90_rrv["RRV_RMSSD"]) < np.array(rsp110_rrv["RRV_RMSSD"])
assert np.array(rsp90_rrv["RRV_SDSD"]) < np.array(rsp110_rrv["RRV_SDSD"])
# assert np.array(rsp90_rrv["RRV_pNN50"]) == np.array(rsp110_rrv["RRV_pNN50"]) == np.array(rsp110_rrv["RRV_pNN20"]) == np.array(rsp90_rrv["RRV_pNN20"]) == 0
def test_rsp_simulate():
rsp1 = nk.rsp_simulate(duration=20, length=3000, random_state=42)
assert len(rsp1) == 3000
rsp2 = nk.rsp_simulate(duration=20, length=3000, respiratory_rate=80, random_state=42)
# pd.DataFrame({"RSP1":rsp1, "RSP2":rsp2}).plot()
# pd.DataFrame({"RSP1":rsp1, "RSP2":rsp2}).hist()
assert len(nk.signal_findpeaks(rsp1, height_min=0.2)["Peaks"]) < len(
nk.signal_findpeaks(rsp2, height_min=0.2)["Peaks"]
)
rsp3 = nk.rsp_simulate(duration=20, length=3000, method="sinusoidal", random_state=42)
rsp4 = nk.rsp_simulate(duration=20, length=3000, method="breathmetrics", random_state=42)
# pd.DataFrame({"RSP3":rsp3, "RSP4":rsp4}).plot()
assert len(nk.signal_findpeaks(rsp3, height_min=0.2)["Peaks"]) > len(
nk.signal_findpeaks(rsp4, height_min=0.2)["Peaks"]
)
def test_rsp_amplitude():
rsp = nk.rsp_simulate(duration=120, sampling_rate=1000, respiratory_rate=15, method="sinusoidal", noise=0)
rsp_cleaned = nk.rsp_clean(rsp, sampling_rate=1000)
signals, info = nk.rsp_peaks(rsp_cleaned)
# Test with dictionary.
amplitude = nk.rsp_amplitude(rsp, signals)
assert amplitude.shape == (rsp.size,)
assert np.abs(amplitude.mean() - 1) < 0.01
# Test with DataFrame.
amplitude = nk.rsp_amplitude(rsp, info)
assert amplitude.shape == (rsp.size,)
assert np.abs(amplitude.mean() - 1) < 0.01
def test_rsp_plot():
rsp = nk.rsp_simulate(duration=120, sampling_rate=1000, respiratory_rate=15)
rsp_summary, _ = nk.rsp_process(rsp, sampling_rate=1000)
nk.rsp_plot(rsp_summary)
# This will identify the latest figure.
fig = plt.gcf()
assert len(fig.axes) == 3
titles = ["Raw and Cleaned Signal", "Breathing Rate", "Breathing Amplitude"]
for (ax, title) in zip(fig.get_axes(), titles):
assert ax.get_title() == title
plt.close(fig)
def test_rsp_simulate():
rsp1 = nk.rsp_simulate(duration=20, length=3000, random_state=42)
assert len(rsp1) == 3000
rsp2 = nk.rsp_simulate(duration=20, length=3000, respiratory_rate=80, random_state=42)
# pd.DataFrame({"RSP1":rsp1, "RSP2":rsp2}).plot()
# pd.DataFrame({"RSP1":rsp1, "RSP2":rsp2}).hist()
assert len(nk.signal_findpeaks(rsp1, height_min=0.2)["Peaks"]) < len(
nk.signal_findpeaks(rsp2, height_min=0.2)["Peaks"]
)
rsp3 = nk.rsp_simulate(duration=20, length=3000, method="sinusoidal", random_state=42)
rsp4 = nk.rsp_simulate(duration=20, length=3000, method="breathmetrics", random_state=42)
# pd.DataFrame({"RSP3":rsp3, "RSP4":rsp4}).plot()
assert len(nk.signal_findpeaks(rsp3, height_min=0.2)["Peaks"]) > len(
nk.signal_findpeaks(rsp4, height_min=0.2)["Peaks"]
)
def test_rsp_peaks():
rsp = nk.rsp_simulate(duration=120, sampling_rate=1000, respiratory_rate=15, random_state=42)
rsp_cleaned = nk.rsp_clean(rsp, sampling_rate=1000)
signals, info = nk.rsp_peaks(rsp_cleaned)
assert signals.shape == (120000, 2)
assert signals["RSP_Peaks"].sum() == 28
assert signals["RSP_Troughs"].sum() == 28
assert info["RSP_Peaks"].shape[0] == 28
assert info["RSP_Troughs"].shape[0] == 28
assert np.allclose(info["RSP_Peaks"].sum(), 1643817)
assert np.allclose(info["RSP_Troughs"].sum(), 1586588)
# Assert that extrema start with a trough and end with a peak.
assert info["RSP_Peaks"][0] > info["RSP_Troughs"][0]
assert info["RSP_Peaks"][-1] > info["RSP_Troughs"][-1]
def test_rsp_clean():
sampling_rate = 100
duration = 120
rsp = nk.rsp_simulate(
duration=duration, sampling_rate=sampling_rate, respiratory_rate=15, noise=0.1, random_state=42
)
# Add linear drift (to test baseline removal).
rsp += nk.signal_distort(rsp, sampling_rate=sampling_rate, linear_drift=True)
khodadad2018 = nk.rsp_clean(rsp, sampling_rate=sampling_rate, method="khodadad2018")
assert len(rsp) == len(khodadad2018)
rsp_biosppy = nk.rsp_clean(rsp, sampling_rate=sampling_rate, method="biosppy")
assert len(rsp) == len(rsp_biosppy)
# Check if filter was applied.
fft_raw = np.abs(np.fft.rfft(rsp))
fft_khodadad2018 = np.abs(np.fft.rfft(khodadad2018))
fft_biosppy = np.abs(np.fft.rfft(rsp_biosppy))
signals, info = nk.ecg_process(ecg, sampling_rate=250)
# Visualise the processing
nk.ecg_plot(signals, sampling_rate=250)
# Save it
plot = nk.ecg_plot(signals, sampling_rate=250)
plot.set_size_inches(10, 6, forward=True)
plot.savefig("README_ecg.png", dpi=300, h_pad=3)
# =============================================================================
# Respiration (RSP) processing
# =============================================================================
# Generate one minute of RSP signal (recorded at 250 samples / second)
rsp = nk.rsp_simulate(duration=60, sampling_rate=250, respiratory_rate=15)
# Process it
signals, info = nk.rsp_process(rsp, sampling_rate=250)
# Visualise the processing
nk.rsp_plot(signals, sampling_rate=250)
# Save it
plot = nk.rsp_plot(signals, sampling_rate=250)
plot.set_size_inches(10, 6, forward=True)
plot.savefig("README_rsp.png", dpi=300, h_pad=3)
# =============================================================================
# Electromyography (EMG) processing
# =============================================================================
def rsp_generate(duration=90, sampling_rate=1000, respiratory_rate=None, method="Complex"):
if respiratory_rate is None:
respiratory_rate = np.random.randint(10, 25)
if method == "Simple":
actual_method = "sinusoidal"
else:
actual_method = "breathmetrics"
rsp = nk.rsp_simulate(duration=duration, sampling_rate=sampling_rate, respiratory_rate=respiratory_rate, noise=0, method=actual_method)
info = {"Duration": [duration],
"Sampling_Rate": [sampling_rate],
"Respiratory_Rate": [respiratory_rate],
"Simulation": [method]}
return rsp, info