Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_emg_simulate():
emg1 = nk.emg_simulate(duration=20, length=5000, burst_number=1)
assert len(emg1) == 5000
emg2 = nk.emg_simulate(duration=20, length=5000, burst_number=15)
assert scipy.stats.median_absolute_deviation(emg1) < scipy.stats.median_absolute_deviation(emg2)
emg3 = nk.emg_simulate(duration=20, length=5000, burst_number=1, burst_duration=2.0)
# pd.DataFrame({"EMG1":emg1, "EMG3": emg3}).plot()
assert len(nk.signal_findpeaks(emg3, height_min=1.0)["Peaks"]) > len(
nk.signal_findpeaks(emg1, height_min=1.0)["Peaks"]
)
def test_emg_activation():
emg = nk.emg_simulate(duration=10, burst_number=3)
cleaned = nk.emg_clean(emg)
emg_amplitude = nk.emg_amplitude(cleaned)
activity_signal, info = nk.emg_activation(emg_amplitude)
assert set(activity_signal.columns.to_list()) == set(list(info.keys()))
assert len(info["EMG_Onsets"]) == len(info["EMG_Offsets"])
for i, j in zip(info["EMG_Onsets"], info["EMG_Offsets"]):
assert i < j
def test_bio_process():
sampling_rate = 1000
# Create data
ecg = nk.ecg_simulate(duration=30, sampling_rate=sampling_rate)
rsp = nk.rsp_simulate(duration=30, sampling_rate=sampling_rate)
eda = nk.eda_simulate(duration=30, sampling_rate=sampling_rate, scr_number=3)
emg = nk.emg_simulate(duration=30, sampling_rate=sampling_rate, burst_number=3)
bio_df, bio_info = nk.bio_process(ecg=ecg, rsp=rsp, eda=eda, emg=emg, sampling_rate=sampling_rate)
# SCR components
scr = [val for key, val in bio_info.items() if "SCR" in key]
assert all(len(elem) == len(scr[0]) for elem in scr)
assert all(bio_info["SCR_Onsets"] < bio_info["SCR_Peaks"])
assert all(bio_info["SCR_Peaks"] < bio_info["SCR_Recovery"])
# RSP
assert all(bio_info["RSP_Peaks"] > bio_info["RSP_Troughs"])
assert len(bio_info["RSP_Peaks"]) == len(bio_info["RSP_Troughs"])
# EMG
assert all(bio_info["EMG_Offsets"] > bio_info["EMG_Onsets"])
assert len(bio_info["EMG_Offsets"] == len(bio_info["EMG_Onsets"]))
def test_emg_simulate():
emg1 = nk.emg_simulate(duration=20, length=5000, burst_number=1)
assert len(emg1) == 5000
emg2 = nk.emg_simulate(duration=20, length=5000, burst_number=15)
assert scipy.stats.median_absolute_deviation(emg1) < scipy.stats.median_absolute_deviation(emg2)
emg3 = nk.emg_simulate(duration=20, length=5000, burst_number=1, burst_duration=2.0)
# pd.DataFrame({"EMG1":emg1, "EMG3": emg3}).plot()
assert len(nk.signal_findpeaks(emg3, height_min=1.0)["Peaks"]) > len(
nk.signal_findpeaks(emg1, height_min=1.0)["Peaks"]
)
def test_emg_plot():
sampling_rate = 1000
emg = nk.emg_simulate(duration=10, sampling_rate=1000, burst_number=3)
emg_summary, _ = nk.emg_process(emg, sampling_rate=sampling_rate)
# Plot data over samples.
nk.emg_plot(emg_summary)
# This will identify the latest figure.
fig = plt.gcf()
assert len(fig.axes) == 2
titles = ["Raw and Cleaned Signal", "Muscle Activation"]
for (ax, title) in zip(fig.get_axes(), titles):
assert ax.get_title() == title
assert fig.get_axes()[1].get_xlabel() == "Samples"
np.testing.assert_array_equal(fig.axes[0].get_xticks(), fig.axes[1].get_xticks())
plt.close(fig)
# Plot data over time.
nk.emg_plot(emg_summary, sampling_rate=sampling_rate)
def test_emg_eventrelated():
emg = nk.emg_simulate(duration=20, sampling_rate=1000, burst_number=3)
emg_signals, info = nk.emg_process(emg, sampling_rate=1000)
epochs = nk.epochs_create(
emg_signals, events=[3000, 6000, 9000], sampling_rate=1000, epochs_start=-0.1, epochs_end=1.9
)
emg_eventrelated = nk.emg_eventrelated(epochs)
# Test amplitude features
no_activation = np.where(emg_eventrelated["EMG_Activation"] == 0)[0][0]
assert int(pd.DataFrame(emg_eventrelated.values[no_activation]).isna().sum()) == 4
assert np.alltrue(
np.nansum(np.array(emg_eventrelated["EMG_Amplitude_Mean"]))
< np.nansum(np.array(emg_eventrelated["EMG_Amplitude_Max"]))
)
assert len(emg_eventrelated["Label"]) == 3
# Preprocess the data (filter, find peaks, etc.)
processed_data, info = nk.bio_process(ecg=data["ECG"], rsp=data["RSP"], eda=data["EDA"], sampling_rate=100)
# Compute relevant features
results = nk.bio_analyze(processed_data, sampling_rate=100)
# =============================================================================
# Simulate physiological signals
# =============================================================================
# Generate synthetic signals
ecg = nk.ecg_simulate(duration=10, heart_rate=70)
ppg = nk.ppg_simulate(duration=10, heart_rate=70)
rsp = nk.rsp_simulate(duration=10, respiratory_rate=15)
eda = nk.eda_simulate(duration=10, scr_number=3)
emg = nk.emg_simulate(duration=10, burst_number=2)
# Visualise biosignals
data = pd.DataFrame({"ECG": ecg,
"PPG": ppg,
"RSP": rsp,
"EDA": eda,
"EMG": emg})
nk.signal_plot(data, subplots=True)
# Save it
data = pd.DataFrame({"ECG": nk.ecg_simulate(duration=10, heart_rate=70, noise=0),
"PPG": nk.ppg_simulate(duration=10, heart_rate=70, powerline_amplitude=0),
"RSP": nk.rsp_simulate(duration=10, respiratory_rate=15, noise=0),
"EDA": nk.eda_simulate(duration=10, scr_number=3, noise=0),
"EMG": nk.emg_simulate(duration=10, burst_number=2, noise=0)})
# Visualise biosignals
data = pd.DataFrame({"ECG": ecg,
"PPG": ppg,
"RSP": rsp,
"EDA": eda,
"EMG": emg})
nk.signal_plot(data, subplots=True)
# Save it
data = pd.DataFrame({"ECG": nk.ecg_simulate(duration=10, heart_rate=70, noise=0),
"PPG": nk.ppg_simulate(duration=10, heart_rate=70, powerline_amplitude=0),
"RSP": nk.rsp_simulate(duration=10, respiratory_rate=15, noise=0),
"EDA": nk.eda_simulate(duration=10, scr_number=3, noise=0),
"EMG": nk.emg_simulate(duration=10, burst_number=2, noise=0)})
plot = data.plot(subplots=True, layout=(5, 1), color=['#f44336', "#E91E63", "#2196F3", "#9C27B0", "#FF9800"])
fig = plt.gcf()
fig.set_size_inches(10, 6, forward=True)
[ax.legend(loc=1) for ax in plt.gcf().axes]
fig.savefig("README_simulation.png", dpi=300, h_pad=3)
# =============================================================================
# Electrodermal Activity (EDA) processing
# =============================================================================
# Generate 10 seconds of EDA signal (recorded at 250 samples / second) with 2 SCR peaks
eda = nk.eda_simulate(duration=10, sampling_rate=250, scr_number=2, drift=0.1)
# Process it
signals, info = nk.eda_process(eda, sampling_rate=250)