Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_signal_from_disk(filename=None, sampling_rate=2000):
if filename is None:
ecg = nk.ecg_simulate(duration=10, sampling_rate=sampling_rate, method="ecgsyn")
else:
filename = (pathlib.Path(__file__) / "../ecg_data" / filename).resolve().as_posix()
ecg = np.array(pd.read_csv(filename))[:, 1]
return ecg, sampling_rate
def test_hrv():
ecg = nk.ecg_simulate(duration=60, sampling_rate=1000, heart_rate=110, random_state=42)
_, peaks = nk.ecg_process(ecg, sampling_rate=1000)
ecg_hrv = nk.hrv(peaks, sampling_rate=1000)
columns = ['HRV_RMSSD', 'HRV_MeanNN', 'HRV_SDNN', 'HRV_SDSD', 'HRV_CVNN',
'HRV_CVSD', 'HRV_MedianNN', 'HRV_MadNN', 'HRV_MCVNN', 'HRV_IQRNN',
'HRV_pNN50', 'HRV_pNN20', 'HRV_TINN', 'HRV_HTI', 'HRV_ULF',
'HRV_VLF', 'HRV_LF', 'HRV_HF', 'HRV_VHF', 'HRV_LFHF', 'HRV_LFn',
'HRV_HFn', 'HRV_LnHF', 'HRV_SD1', 'HRV_SD2', 'HRV_SD1SD2', 'HRV_S',
'HRV_CSI', 'HRV_CVI', 'HRV_CSI_Modified', 'HRV_PIP', 'HRV_IALS',
'HRV_PSS', 'HRV_PAS', 'HRV_GI', 'HRV_SI', 'HRV_AI', 'HRV_PI',
'HRV_C1d', 'HRV_C1a', 'HRV_SD1d',
'HRV_SD1a', 'HRV_C2d',
'HRV_C2a', 'HRV_SD2d', 'HRV_SD2a',
'HRV_Cd', 'HRV_Ca', 'HRV_SDNNd',
assert info_gamboa["ECG_R_Peaks"].size == 69
# Test elgendi2010 method
info_elgendi = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="elgendi2010"), method="elgendi2010")
assert info_elgendi["ECG_R_Peaks"].size == 70
# Test engzeemod2012 method
info_engzeemod = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="engzeemod2012"), method="engzeemod2012")
assert info_engzeemod["ECG_R_Peaks"].size == 70
# Test kalidas2017 method
info_kalidas = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="kalidas2017"), method="kalidas2017")
assert np.allclose(info_kalidas["ECG_R_Peaks"].size, 68, atol=1)
# Test martinez2003 method
ecg = nk.ecg_simulate(duration=60, sampling_rate=sampling_rate, noise=0, random_state=42)
ecg_cleaned = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="neurokit")
info_martinez = nk.ecg_findpeaks(ecg_cleaned, method="martinez2003")
assert np.allclose(info_martinez["ECG_R_Peaks"].size, 69, atol=1)
def test_ecg_simulate():
ecg1 = nk.ecg_simulate(duration=20, length=5000, method="simple", noise=0)
assert len(ecg1) == 5000
ecg2 = nk.ecg_simulate(duration=20, length=5000, heart_rate=500)
# pd.DataFrame({"ECG1":ecg1, "ECG2": ecg2}).plot()
# pd.DataFrame({"ECG1":ecg1, "ECG2": ecg2}).hist()
assert len(nk.signal_findpeaks(ecg1, height_min=0.6)["Peaks"]) < len(
nk.signal_findpeaks(ecg2, height_min=0.6)["Peaks"]
)
ecg3 = nk.ecg_simulate(duration=10, length=5000)
# pd.DataFrame({"ECG1":ecg1, "ECG3": ecg3}).plot()
assert len(nk.signal_findpeaks(ecg2, height_min=0.6)["Peaks"]) > len(
nk.signal_findpeaks(ecg3, height_min=0.6)["Peaks"]
)
def test_hrv_frequency():
# Test frequency domain
ecg1 = nk.ecg_simulate(duration=60, sampling_rate=2000, heart_rate=70, random_state=42)
_, peaks1 = nk.ecg_process(ecg1, sampling_rate=2000)
hrv1 = nk.hrv_frequency(peaks1, sampling_rate=2000)
ecg2 = nk.signal_resample(ecg1, sampling_rate=2000, desired_sampling_rate=500)
_, peaks2 = nk.ecg_process(ecg2, sampling_rate=500)
hrv2 = nk.hrv_frequency(peaks2, sampling_rate=500)
assert np.allclose(hrv1["HRV_HF"] - hrv2["HRV_HF"], 0, atol=1.5)
assert np.isnan(hrv1["HRV_LF"][0])
assert np.isnan(hrv2["HRV_LF"][0])
assert np.isnan(hrv1["HRV_VLF"][0])
assert np.isnan(hrv2["HRV_LF"][0])
def test_ecg_clean():
sampling_rate = 1000
noise = 0.05
ecg = nk.ecg_simulate(sampling_rate=sampling_rate, noise=noise)
ecg_cleaned_nk = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="neurokit")
assert ecg.size == ecg_cleaned_nk.size
# Assert that highpass filter with .5 Hz lowcut was applied.
fft_raw = np.abs(np.fft.rfft(ecg))
fft_nk = np.abs(np.fft.rfft(ecg_cleaned_nk))
freqs = np.fft.rfftfreq(ecg.size, 1 / sampling_rate)
assert np.sum(fft_raw[freqs < 0.5]) > np.sum(fft_nk[freqs < 0.5])
# Comparison to biosppy (https://github.com/PIA-Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69)
ecg_biosppy = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="biosppy")
original, _, _ = biosppy.tools.filter_signal(
signal=ecg,
def test_ecg_findpeaks():
sampling_rate = 1000
ecg = nk.ecg_simulate(duration=60, sampling_rate=sampling_rate, noise=0, method="simple", random_state=42)
ecg_cleaned = nk.ecg_clean(ecg, sampling_rate=sampling_rate, method="neurokit")
# Test neurokit methodwith show=True
info_nk = nk.ecg_findpeaks(ecg_cleaned, show=True)
assert info_nk["ECG_R_Peaks"].size == 69
# This will identify the latest figure.
fig = plt.gcf()
assert len(fig.axes) == 2
# Test pantompkins1985 method
info_pantom = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="pantompkins1985"), method="pantompkins1985")
assert info_pantom["ECG_R_Peaks"].size == 70
# Test hamilton2002 method
# Load the NeuroKit package
import neurokit2 as nk
# Simulate 30 seconds of ECG Signal (recorded at 250 samples / second)
ecg_signal = nk.ecg_simulate(duration=30, sampling_rate=250)
# Automatically process the (raw) ECG signal
signals, info = nk.ecg_process(ecg_signal, sampling_rate=250)
# Extract clean ECG and R-peasks location
rpeaks = info["ECG_R_Peaks"]
cleaned_ecg = signals["ECG_Clean"]
# Visualize R-peaks in ECG signal
nk.events_plot(rpeaks, cleaned_ecg)
# Segment the signal around the R-peaks
epochs = nk.epochs_create(cleaned_ecg, events=rpeaks, sampling_rate=250, epochs_start=-0.4, epochs_duration=1)
# Plotting all the heart beats
nk.epochs_plot(epochs)
import numpy as np
import pandas as pd
import neurokit2 as nk
sampling_rate = 1000
for heartrate in [80]:
# Simulate signal
ecg = nk.ecg_simulate(duration=60,
sampling_rate=sampling_rate,
heartrate=heartrate,
noise=0)
# Segment
_, rpeaks = nk.ecg_peaks(ecg, sampling_rate=sampling_rate)
# _, waves = nk.ecg_delineator(ecg, rpeaks=rpeaks["ECG_R_Peaks"])