Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_ecg_delineate():
sampling_rate = 1000
# test with simulated signals
ecg = nk.ecg_simulate(duration=20, sampling_rate=sampling_rate, random_state=42)
_, rpeaks = nk.ecg_peaks(ecg, sampling_rate=sampling_rate)
number_rpeaks = len(rpeaks["ECG_R_Peaks"])
# Method 1: derivative
_, waves_derivative = nk.ecg_delineate(ecg, rpeaks, sampling_rate=sampling_rate)
assert len(waves_derivative["ECG_P_Peaks"]) == number_rpeaks
assert len(waves_derivative["ECG_Q_Peaks"]) == number_rpeaks
assert len(waves_derivative["ECG_S_Peaks"]) == number_rpeaks
assert len(waves_derivative["ECG_T_Peaks"]) == number_rpeaks
assert len(waves_derivative["ECG_P_Onsets"]) == number_rpeaks
assert len(waves_derivative["ECG_T_Offsets"]) == number_rpeaks
# Method 2: CWT
_, waves_cwt = nk.ecg_delineate(ecg, rpeaks, sampling_rate=sampling_rate, method="cwt")
assert np.allclose(len(waves_cwt["ECG_P_Peaks"]), 22, atol=1)
assert np.allclose(len(waves_cwt["ECG_T_Peaks"]), 22, atol=1)
assert np.allclose(len(waves_cwt["ECG_R_Onsets"]), 23, atol=1)
def test_ecg_peaks():
sampling_rate = 1000
noise = 0.15
ecg = nk.ecg_simulate(duration=120, sampling_rate=sampling_rate, noise=noise, random_state=42)
ecg_cleaned_nk = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="neurokit")
# Test without request to correct artifacts.
signals, info = nk.ecg_peaks(ecg_cleaned_nk, correct_artifacts=False, method="neurokit")
assert signals.shape == (120000, 1)
assert np.allclose(signals["ECG_R_Peaks"].values.sum(dtype=np.int64), 139, atol=1)
# Test with request to correct artifacts.
signals, info = nk.ecg_peaks(ecg_cleaned_nk, correct_artifacts=True, method="neurokit")
assert signals.shape == (120000, 1)
assert np.allclose(signals["ECG_R_Peaks"].values.sum(dtype=np.int64), 139, atol=1)
def rollingz(ecg, sampling_rate):
ecg = nk.standardize(ecg, window=sampling_rate*2)
signal, info = nk.ecg_peaks(ecg, sampling_rate=sampling_rate, method="neurokit")
return info["ECG_R_Peaks"]
# Save plot
fig = plt.gcf()
fig.set_size_inches(10, 6)
fig.savefig("README_signalprocessing.png", dpi=300, h_pad=3)
# =============================================================================
# Heart Rate Variability
# =============================================================================
# Download data
data = nk.data("bio_resting_8min_100hz")
# Find peaks
peaks, info = nk.ecg_peaks(data["ECG"], sampling_rate=100)
# Compute HRV indices
hrv = nk.hrv(peaks, sampling_rate=100, show=True)
hrv
# Save plot
fig = plt.gcf()
fig.set_size_inches(10*1.5, 6*1.5, forward=True)
fig.savefig("README_hrv.png", dpi=300, h_pad=3)
# =============================================================================
# ECG Delineation
# =============================================================================
# Download data
import numpy as np
import pandas as pd
import neurokit2 as nk
sampling_rate = 1000
for heartrate in [80]:
# Simulate signal
ecg = nk.ecg_simulate(duration=60,
sampling_rate=sampling_rate,
heartrate=heartrate,
noise=0)
# Segment
_, rpeaks = nk.ecg_peaks(ecg, sampling_rate=sampling_rate)
# _, waves = nk.ecg_delineator(ecg, rpeaks=rpeaks["ECG_R_Peaks"])
def tarvainen(ecg, sampling_rate):
ecg = nk.signal_detrend(ecg, method="tarvainen2002")
signal, info = nk.ecg_peaks(ecg, sampling_rate=sampling_rate, method="neurokit")
return info["ECG_R_Peaks"]
def locreg(ecg, sampling_rate):
ecg = nk.signal_detrend(ecg,
method="locreg",
window=0.5*sampling_rate,
stepsize=0.02*sampling_rate)
signal, info = nk.ecg_peaks(ecg, sampling_rate=sampling_rate, method="neurokit")
return info["ECG_R_Peaks"]
def polylength(ecg, sampling_rate):
length = len(ecg) / sampling_rate
ecg = nk.signal_detrend(ecg, method="polynomial", order=int(length / 2))
signal, info = nk.ecg_peaks(ecg, sampling_rate=sampling_rate, method="neurokit")
return info["ECG_R_Peaks"]