Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_eog_intervalrelated():
eog = nk.data('eog_200hz')['vEOG']
eog_signals, info = nk.eog_process(eog, sampling_rate=200)
columns = ['EOG_Peaks_N', 'EOG_Rate_Mean']
# Test with signal dataframe
features = nk.eog_intervalrelated(eog_signals)
assert all(elem in np.array(features.columns.values, dtype=str) for elem
in columns)
assert features.shape[0] == 1 # Number of rows
# Test with dict
epochs = nk.epochs_create(eog_signals, events=[5000, 10000, 15000], epochs_start=-0.1, epochs_end=1.9)
epochs_dict = nk.eog_intervalrelated(epochs)
assert all(elem in columns for elem
in np.array(epochs_dict.columns.values, dtype=str))
assert epochs_dict.shape[0] == len(epochs) # Number of rows
def test_rsp_eventrelated():
rsp, info = nk.rsp_process(nk.rsp_simulate(duration=30, random_state=42))
epochs = nk.epochs_create(rsp, events=[5000, 10000, 15000], epochs_start=-0.1, epochs_end=1.9)
rsp_eventrelated = nk.rsp_eventrelated(epochs)
# Test rate features
assert np.alltrue(np.array(rsp_eventrelated["RSP_Rate_Min"]) < np.array(rsp_eventrelated["RSP_Rate_Mean"]))
assert np.alltrue(np.array(rsp_eventrelated["RSP_Rate_Mean"]) < np.array(rsp_eventrelated["RSP_Rate_Max"]))
# Test amplitude features
assert np.alltrue(
np.array(rsp_eventrelated["RSP_Amplitude_Min"]) < np.array(rsp_eventrelated["RSP_Amplitude_Mean"])
)
assert np.alltrue(
np.array(rsp_eventrelated["RSP_Amplitude_Mean"]) < np.array(rsp_eventrelated["RSP_Amplitude_Max"])
)
def test_eog_eventrelated():
eog = nk.data('eog_200hz')['vEOG']
eog_signals, info = nk.eog_process(eog, sampling_rate=200)
epochs = nk.epochs_create(eog_signals, events=[5000, 10000, 15000], epochs_start=-0.1, epochs_end=1.9)
eog_eventrelated = nk.eog_eventrelated(epochs)
# Test rate features
assert np.alltrue(np.array(eog_eventrelated["EOG_Rate_Min"]) < np.array(eog_eventrelated["EOG_Rate_Mean"]))
assert np.alltrue(np.array(eog_eventrelated["EOG_Rate_Mean"]) < np.array(eog_eventrelated["EOG_Rate_Max"]))
# Test blink presence
assert np.alltrue(np.array(eog_eventrelated["EOG_Blinks_Presence"]) == np.array([1, 0, 0]))
def test_eda_intervalrelated():
data = nk.data("bio_resting_8min_100hz")
df, info = nk.eda_process(data["EDA"], sampling_rate=100)
columns = ["SCR_Peaks_N", "SCR_Peaks_Amplitude_Mean"]
# Test with signal dataframe
features_df = nk.eda_intervalrelated(df)
assert all(elem in columns for elem in np.array(features_df.columns.values, dtype=str))
assert features_df.shape[0] == 1 # Number of rows
# Test with dict
epochs = nk.epochs_create(df, events=[0, 25300], sampling_rate=100, epochs_end=20)
features_dict = nk.eda_intervalrelated(epochs)
assert all(elem in columns for elem in np.array(features_dict.columns.values, dtype=str))
assert features_dict.shape[0] == 2 # Number of rows
def test_eda_eventrelated():
eda = nk.eda_simulate(duration=15, scr_number=3)
eda_signals, info = nk.eda_process(eda, sampling_rate=1000)
epochs = nk.epochs_create(
eda_signals, events=[5000, 10000, 15000], sampling_rate=1000, epochs_start=-0.1, epochs_end=1.9
)
eda_eventrelated = nk.eda_eventrelated(epochs)
no_activation = np.where(eda_eventrelated["EDA_SCR"] == 0)[0][0]
assert int(pd.DataFrame(eda_eventrelated.values[no_activation]).isna().sum()) == 4
assert len(eda_eventrelated["Label"]) == 3
def test_bio_analyze():
# Example with event-related analysis
data = nk.data("bio_eventrelated_100hz")
df, info = nk.bio_process(
ecg=data["ECG"], rsp=data["RSP"], eda=data["EDA"], keep=data["Photosensor"], sampling_rate=100
)
events = nk.events_find(
data["Photosensor"], threshold_keep="below", event_conditions=["Negative", "Neutral", "Neutral", "Negative"]
)
epochs = nk.epochs_create(df, events, sampling_rate=100, epochs_start=-0.1, epochs_end=1.9)
event_related = nk.bio_analyze(epochs)
assert len(event_related) == len(epochs)
labels = [int(i) for i in event_related["Label"]]
assert labels == list(np.arange(1, len(epochs) + 1))
# Example with interval-related analysis
data = nk.data("bio_resting_8min_100hz")
df, info = nk.bio_process(ecg=data["ECG"], rsp=data["RSP"], eda=data["EDA"], sampling_rate=100)
interval_related = nk.bio_analyze(df)
assert len(interval_related) == 1
def test_emg_intervalrelated():
emg = nk.emg_simulate(duration=40, sampling_rate=1000, burst_number=3)
emg_signals, info = nk.emg_process(emg, sampling_rate=1000)
columns = ["EMG_Activation_N", "EMG_Amplitude_Mean"]
# Test with signal dataframe
features_df = nk.emg_intervalrelated(emg_signals)
assert all(elem in columns for elem in np.array(features_df.columns.values, dtype=str))
assert features_df.shape[0] == 1 # Number of rows
# Test with dict
epochs = nk.epochs_create(emg_signals, events=[0, 20000], sampling_rate=1000, epochs_end=20)
features_dict = nk.emg_intervalrelated(epochs)
assert all(elem in columns for elem in np.array(features_dict.columns.values, dtype=str))
assert features_dict.shape[0] == 2 # Number of rows
for i in range(events.shape[1]):
events[:, i] = nk.rescale(events[:, i], to=[0, 1]) # Reshape to 0-1 scale
try:
p_gamma[i, :], _ = scipy.optimize.curve_fit(fit_gamma, x, events[:, i], p0=[1, 2, 2, 3])
p_bateman[i, :], _ = scipy.optimize.curve_fit(fit_bateman, x, events[:, i], p0=[1, 1, 0.75, 2])
p_scr[i, :], _ = scipy.optimize.curve_fit(fit_scr, x, events[:, i], p0=[1, 3, 0.7, 3, 5])
p_poly[i, :] = params_poly(events[:, i], order=4)
except RuntimeError:
pass
# Visualize for one particpant
cleaned = nk.eog_clean(signals[0], sampling_rate=100, method='neurokit')
blinks = nk.eog_findpeaks(cleaned, sampling_rate=100, method="mne")
events = nk.epochs_create(cleaned, blinks, sampling_rate=100, epochs_start=-0.4, epochs_end=0.6)
events = nk.epochs_to_array(events)
for i in range(events.shape[1]):
events[:, i] = nk.rescale(events[:, i], to=[0, 1]) # Reshape to 0-1 scale
x = np.linspace(0, 100, num=len(events))
template_gamma = fit_gamma(x, *np.nanmedian(p_gamma, axis=0))
template_bateman = fit_bateman(x, *np.nanmedian(p_bateman, axis=0))
template_scr = fit_scr(x, *np.nanmedian(p_scr, axis=0))
template_poly = fit_poly(x, np.nanmedian(p_poly, axis=0))
plt.plot(events, linewidth=0.25, color="black")
plt.plot(template_gamma, linewidth=2, linestyle='-', color="red", label='Gamma')
plt.plot(template_bateman, linewidth=2, linestyle='-', color="blue", label='Bateman')
plt.plot(template_scr, linewidth=2, linestyle='-', color="orange", label='SCR')
plt.plot(template_poly, linewidth=2, linestyle='-', color="green", label='Polynomial')
data_rmse = pd.DataFrame(columns=["RMSE", "Index", "Participant", "Task", "Function"])
for i in range(4):
data = pd.read_csv("../../data/eogdb/eogdb_task" + str(i + 1) + ".csv")
for j, participant in enumerate(np.unique(data["Participant"])[1:3]):
segment = data[data["Participant"] == participant]
signal = segment["vEOG"]
cleaned = nk.eog_clean(signal, sampling_rate=200, method='neurokit')
blinks = nk.signal_findpeaks(cleaned, relative_height_min=1.5)["Peaks"]
events = nk.epochs_create(cleaned, blinks, sampling_rate=200, epochs_start=-0.4, epochs_end=0.6)
events = nk.epochs_to_array(events) # Convert to 2D array
# Rescale
for i in range(events.shape[1]):
events[:, i] = nk.rescale(events[:, i], to=[0, 1]) # Reshape to 0-1 scale
# RMSE - Gamma
rmse = pd.DataFrame({"RMSE": [nk.fit_rmse(events[:, i], template_gamma) for i in range(events.shape[1])],
"Index": range(events.shape[1]),
"Participant": [participant]*events.shape[1],
"Task": [data["Task"][0]]*events.shape[1],
"Function": ["Gamma"] * events.shape[1]})
rmse["Index"] = rmse["Participant"] + "_" + rmse["Task"] + "_" + rmse["Index"].astype(str)
data_rmse = pd.concat([data_rmse, rmse], axis=0)
# RMSE - SCR