Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _interpolate_big(peaks, sampling_rate=1000, interval_max=None, relative_interval_max=None, robust=False):
if interval_max is None and relative_interval_max is None:
return peaks
continue_loop = True
while continue_loop is True:
if interval_max is not None:
interval = signal_period(peaks, sampling_rate=sampling_rate, desired_length=len(peaks))
peaks, continue_loop = _interpolate_missing(peaks, interval, interval_max, sampling_rate)
if relative_interval_max is not None:
interval = signal_period(peaks, sampling_rate=sampling_rate, desired_length=len(peaks))
interval = standardize(interval, robust=robust)
peaks, continue_loop = _interpolate_missing(peaks, interval, interval_max, sampling_rate)
return peaks
def _signal_findpeaks_keep(
info, what="Height", below=None, above=None, relative_mean=False, relative_median=False, relative_max=False
):
if below is None and above is None:
return info
keep = np.full(len(info["Peaks"]), True)
if relative_max is True:
what = info[what] / np.max(info[what])
elif relative_median is True:
what = standardize(info[what], robust=True)
elif relative_mean is True:
what = standardize(info[what])
else:
what = info[what]
if below is not None:
keep[what > below] = False
if above is not None:
keep[what < above] = False
info = _signal_findpeaks_filter(info, keep)
return info
See Also
--------
eeg_gfp, microstates_peaks
"""
# If MNE object
if isinstance(eeg, (pd.DataFrame, np.ndarray)) is False:
sampling_rate = eeg.info["sfreq"]
info = eeg.info
eeg = eeg.get_data()
else:
info = None
# Normalization
if standardize_eeg is True:
eeg = standardize(eeg, **kwargs)
# Get GFP
gfp = eeg_gfp(eeg, sampling_rate=sampling_rate, normalize=normalize, method=gfp_method, **kwargs)
# Find peaks in the global field power (GFP) or take a given amount of indices
if train == "gfp":
train = gfp
peaks = microstates_peaks(eeg, gfp=train, sampling_rate=sampling_rate, **kwargs)
return eeg, peaks, gfp, info
def _remove_small(peaks, sampling_rate=1000, interval_min=None, relative_interval_min=None, robust=False):
if interval_min is None and relative_interval_min is None:
return peaks
if interval_min is not None:
interval = signal_period(peaks, sampling_rate=sampling_rate, desired_length=len(peaks))
peaks = peaks[interval > interval_min]
if relative_interval_min is not None:
interval = signal_period(peaks, sampling_rate=sampling_rate, desired_length=len(peaks))
peaks = peaks[standardize(interval, robust=robust) > relative_interval_min]
return peaks
def _signal_findpeaks_keep(
info, what="Height", below=None, above=None, relative_mean=False, relative_median=False, relative_max=False
):
if below is None and above is None:
return info
keep = np.full(len(info["Peaks"]), True)
if relative_max is True:
what = info[what] / np.max(info[what])
elif relative_median is True:
what = standardize(info[what], robust=True)
elif relative_mean is True:
what = standardize(info[what])
else:
what = info[what]
if below is not None:
keep[what > below] = False
if above is not None:
keep[what < above] = False
info = _signal_findpeaks_filter(info, keep)
return info
ax2.set_title("Individual Blinks")
# Create epochs
events = epochs_create(
eog_signals["EOG_Clean"],
peaks["EOG_Blinks"],
sampling_rate=sampling_rate,
epochs_start=-0.3,
epochs_end=0.7,
)
events_array = epochs_to_array(events) # Convert to 2D array
events_array = standardize(events_array) # Rescale so that all the blinks are on the same scale
blinks_df = epochs_to_df(events)
blinks_wide = blinks_df.pivot(index="Time", columns="Label", values="Signal")
blinks_wide = standardize(blinks_wide)
cmap = iter(plt.cm.RdBu(np.linspace(0, 1, num=len(events))))
for x, color in zip(blinks_wide, cmap):
ax2.plot(blinks_wide[x], color=color, linewidth=0.4, zorder=1)
# Plot with their median (used here as a robust average)
ax2.plot(
np.array(blinks_wide.index),
np.median(events_array, axis=1),
linewidth=2,
linestyle="--",
color="black",
label="Median",
)
ax2.legend(loc="upper right")
def _calculate_abs_z(df, columns):
"""This function helps to calculate the absolute standardized distance between R-peaks and other delineated waves
features by `ecg_delineate()`"""
for column in columns:
df["Dist_R_" + column] = np.abs(standardize(df[column].sub(df["ECG_R_Peaks"], axis=0)))
return df
been assigned.
References
----------
.. [1] Pascual-Marqui, R. D., Michel, C. M., & Lehmann, D. (1995).
Segmentation of brain electrical activity into microstates: model
estimation and validation. IEEE Transactions on Biomedical
Engineering.
"""
# If MNE object
if isinstance(eeg, (pd.DataFrame, np.ndarray)) is False:
sampling_rate = eeg.info["sfreq"]
eeg = eeg.get_data()
# Normalization
if normalize is True:
eeg = standardize(eeg, **kwargs)
# Find peaks in the global field power (GFP)
if select == "gfp":
gfp = eeg_gfp(eeg, sampling_rate=sampling_rate, normalize=normalize, robust=robust, method="l1", **kwargs)
else:
gfp = False
peaks = microstates_peaks(eeg, gfp, sampling_rate=sampling_rate, **kwargs)
# Cache this value for later
gfp_sum_sq = np.sum(gfp ** 2)
# Do several runs of the k-means algorithm, keep track of the best
# segmentation.
best_gev = 0