Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_signal_detrend():
signal = np.cos(np.linspace(start=0, stop=10, num=1000)) # Low freq
signal += np.cos(np.linspace(start=0, stop=100, num=1000)) # High freq
signal += 3 # Add baseline
rez_nk = nk.signal_detrend(signal, order=1)
rez_scipy = scipy.signal.detrend(signal, type="linear")
assert np.allclose(np.mean(rez_nk - rez_scipy), 0, atol=0.000001)
rez_nk = nk.signal_detrend(signal, order=0)
rez_scipy = scipy.signal.detrend(signal, type="constant")
assert np.allclose(np.mean(rez_nk - rez_scipy), 0, atol=0.000001)
# Tarvainen
rez_nk = nk.signal_detrend(signal, method="tarvainen2002", regularization=500)
assert np.allclose(np.mean(rez_nk - signal), -2.88438737697, atol=0.000001)
fft_khodadad2018 = np.abs(np.fft.rfft(khodadad2018))
fft_biosppy = np.abs(np.fft.rfft(rsp_biosppy))
freqs = np.fft.rfftfreq(len(rsp), 1 / sampling_rate)
assert np.sum(fft_raw[freqs > 3]) > np.sum(fft_khodadad2018[freqs > 3])
assert np.sum(fft_raw[freqs < 0.05]) > np.sum(fft_khodadad2018[freqs < 0.05])
assert np.sum(fft_raw[freqs > 0.35]) > np.sum(fft_biosppy[freqs > 0.35])
assert np.sum(fft_raw[freqs < 0.1]) > np.sum(fft_biosppy[freqs < 0.1])
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/master/biosppy/signals/resp.py#L62)
rsp_biosppy = nk.rsp_clean(rsp, sampling_rate=sampling_rate, method="biosppy")
original, _, _ = biosppy.tools.filter_signal(
signal=rsp, ftype="butter", band="bandpass", order=2, frequency=[0.1, 0.35], sampling_rate=sampling_rate
)
original = nk.signal_detrend(original, order=0)
assert np.allclose((rsp_biosppy - original).mean(), 0, atol=1e-6)
def test_signal_detrend():
signal = np.cos(np.linspace(start=0, stop=10, num=1000)) # Low freq
signal += np.cos(np.linspace(start=0, stop=100, num=1000)) # High freq
signal += 3 # Add baseline
rez_nk = nk.signal_detrend(signal, order=1)
rez_scipy = scipy.signal.detrend(signal, type="linear")
assert np.allclose(np.mean(rez_nk - rez_scipy), 0, atol=0.000001)
rez_nk = nk.signal_detrend(signal, order=0)
rez_scipy = scipy.signal.detrend(signal, type="constant")
assert np.allclose(np.mean(rez_nk - rez_scipy), 0, atol=0.000001)
# Tarvainen
rez_nk = nk.signal_detrend(signal, method="tarvainen2002", regularization=500)
assert np.allclose(np.mean(rez_nk - signal), -2.88438737697, atol=0.000001)
def test_emg_clean():
sampling_rate = 1000
emg = nk.emg_simulate(duration=20, sampling_rate=sampling_rate)
emg_cleaned = nk.emg_clean(emg, sampling_rate=sampling_rate)
assert emg.size == emg_cleaned.size
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/emg.py)
original, _, _ = biosppy.tools.filter_signal(
signal=emg, ftype="butter", band="highpass", order=4, frequency=100, sampling_rate=sampling_rate
)
emg_cleaned_biosppy = nk.signal_detrend(original, order=0)
assert np.allclose((emg_cleaned - emg_cleaned_biosppy).mean(), 0, atol=1e-6)
def tarvainen(ecg, sampling_rate):
ecg = nk.signal_detrend(ecg, method="tarvainen2002")
signal, info = nk.ecg_peaks(ecg, sampling_rate=sampling_rate, method="neurokit")
return info["ECG_R_Peaks"]
distorted = nk.signal_detrend(distorted,
method=detrend_method,
order=detrend_order,
regularization=detrend_regularization,
alpha=detrend_alpha)
if filter_type != "None":
distorted = nk.signal_filter(signal=distorted,
sampling_rate=sampling_rate,
lowcut=filter_lowcut,
highcut=filter_highcut,
method=filter_type,
order=filter_order)
if detrend_position in ["Second", 'Both']:
distorted = nk.signal_detrend(distorted,
method=detrend_method,
order=int(detrend_order),
regularization=detrend_regularization,
alpha=detrend_alpha)
cleaned = distorted
extrema_signal, _ = nk.rsp_findpeaks(distorted, outlier_threshold=0)
try:
rate = nk.rsp_rate(peaks=extrema_signal, sampling_rate=sampling_rate)
except ValueError:
rate = np.full(len(distorted), np.nan)
info["Detrend_Method"] = [detrend_method]
info["Detrend_Order"] = [detrend_order]
info["Detrend_Regularization"] = [detrend_regularization]
info["Detrend_Alpha"] = [detrend_alpha]
# =============================================================================
# Generate original signal
original = nk.signal_simulate(duration=6, frequency=1)
# Distort the signal (add noise, linear trend, artifacts etc.)
distorted = nk.signal_distort(original,
noise_amplitude=0.1,
noise_frequency=[5, 10, 20],
powerline_amplitude=0.05,
artifacts_amplitude=0.3,
artifacts_number=3,
linear_drift=0.5)
# Clean (filter and detrend)
cleaned = nk.signal_detrend(distorted)
cleaned = nk.signal_filter(cleaned, lowcut=0.5, highcut=1.5)
# Compare the 3 signals
plot = nk.signal_plot([original, distorted, cleaned])
# Save plot
fig = plt.gcf()
fig.set_size_inches(10, 6)
fig.savefig("README_signalprocessing.png", dpi=300, h_pad=3)
# =============================================================================
# Heart Rate Variability
# =============================================================================
# Download data
def polylength(ecg, sampling_rate):
length = len(ecg) / sampling_rate
ecg = nk.signal_detrend(ecg, method="polynomial", order=int(length / 2))
signal, info = nk.ecg_peaks(ecg, sampling_rate=sampling_rate, method="neurokit")
return info["ECG_R_Peaks"]
def rsp_custom_process(distorted, info, detrend_position="First", detrend_method="polynomial", detrend_order=0, detrend_regularization=500, detrend_alpha=0.75, filter_type="None", filter_order=5, filter_lowcut=None, filter_highcut=None):
sampling_rate = info["Sampling_Rate"][0]
if detrend_position in ["First", 'Both']:
distorted = nk.signal_detrend(distorted,
method=detrend_method,
order=detrend_order,
regularization=detrend_regularization,
alpha=detrend_alpha)
if filter_type != "None":
distorted = nk.signal_filter(signal=distorted,
sampling_rate=sampling_rate,
lowcut=filter_lowcut,
highcut=filter_highcut,
method=filter_type,
order=filter_order)
if detrend_position in ["Second", 'Both']:
distorted = nk.signal_detrend(distorted,
method=detrend_method,