Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
>>> peaks = np.sort(np.append(peaks, [1350, 11350, 18350])) # add artifacts
>>>
>>> peaks_corrected = nk.signal_fixpeaks(peaks=peaks, interval_min=0.5, interval_max=1.5, method="Neurokit")
>>> # Plot and shift original peaks to the rightto see the difference.
>>> fig = nk.events_plot([peaks + 50, peaks_corrected], signal)
>>> fig #doctest: +SKIP
References
----------
- Lipponen, J. A., & Tarvainen, M. P. (2019). A robust algorithm for heart rate variability time
series artefact correction using novel beat classification. Journal of medical engineering & technology,
43(3), 173-181. 10.1080/03091902.2019.1640306
"""
# Format input
peaks = _signal_formatpeaks_sanitize(peaks)
# If method Kubios
if method.lower() == "kubios":
return _signal_fixpeaks_kubios(peaks, sampling_rate=sampling_rate, iterative=iterative, show=show)
# Else method is NeuroKit
return _signal_fixpeaks_neurokit(
peaks,
sampling_rate=sampling_rate,
interval_min=interval_min,
interval_max=interval_max,
relative_interval_min=relative_interval_min,
relative_interval_max=relative_interval_max,
robust=robust,
)
def _rsp_fixpeaks_retrieve(peaks, troughs=None, desired_length=None):
# Format input.
original_input = peaks
peaks, desired_length = _signal_formatpeaks_sanitize(original_input, desired_length, key="Peaks")
if troughs is None:
troughs, _ = _signal_formatpeaks_sanitize(original_input, desired_length, key="Troughs")
return peaks, troughs, desired_length
rsp_signals = rsp_signals[0]
if isinstance(rsp_signals, pd.DataFrame):
rsp_cols = [col for col in rsp_signals.columns if "RSP_Phase" in col]
if len(rsp_cols) != 2:
edr = ecg_rsp(ecg_period, sampling_rate=sampling_rate)
rsp_signals, _ = rsp_process(edr, sampling_rate)
print(
"NeuroKit warning: _hrv_rsa_formatinput():"
"RSP signal not found. RSP signal is derived from ECG using"
"ecg_rsp(). Please provide RSP signal."
)
if rpeaks is None:
try:
rpeaks = _signal_formatpeaks_sanitize(ecg_signals)
except NameError:
raise ValueError("NeuroKit error: _hrv_rsa_formatinput(): Wrong input, we couldn't extract rpeaks indices.")
else:
rpeaks = _signal_formatpeaks_sanitize(rpeaks)
signals = pd.concat([ecg_signals, rsp_signals], axis=1)
# RSP signal
if "RSP_Clean" in signals.columns:
rsp_signal = signals["RSP_Clean"].values
elif "RSP_Raw" in signals.columns:
rsp_signal = signals["RSP_Raw"].values
elif "RSP" in signals.columns:
rsp_signal = signals["RSP"].values
else:
rsp_signal = None
def _eda_fixpeaks_retrieve(peaks, onsets=None, height=None, desired_length=None):
# Format input.
original_input = peaks
peaks, desired_length = _signal_formatpeaks_sanitize(original_input, desired_length, key="Peaks")
if onsets is None:
onsets, _ = _signal_formatpeaks_sanitize(original_input, desired_length, key="Onsets")
if height is None:
height, _ = _signal_formatpeaks_sanitize(original_input, desired_length, key="Height")
return peaks, onsets, height, desired_length
def _eda_fixpeaks_retrieve(peaks, onsets=None, height=None, desired_length=None):
# Format input.
original_input = peaks
peaks, desired_length = _signal_formatpeaks_sanitize(original_input, desired_length, key="Peaks")
if onsets is None:
onsets, _ = _signal_formatpeaks_sanitize(original_input, desired_length, key="Onsets")
if height is None:
height, _ = _signal_formatpeaks_sanitize(original_input, desired_length, key="Height")
return peaks, onsets, height, desired_length
>>> rpeaks_uncorrected = nk.ecg_findpeaks(ecg)
>>> artifacts, rpeaks_corrected = nk.ecg_fixpeaks(rpeaks_uncorrected,
>>> recursive=True,
>>> show=True)
>>> rate_corrected = nk.ecg_rate(rpeaks_corrected,
>>> desired_length=len(ecg))
>>> rate_uncorrected = nk.ecg_rate(rpeaks_uncorrected,
desired_length=len(ecg))
>>>
>>> fig, ax = plt.subplots()
>>> ax.plot(rate_uncorrected, label="heart rate without artifact correction")
>>> ax.plot(rate_corrected, label="heart rate with artifact correction")
>>> ax.legend(loc="upper right")
"""
# Get R-peaks indices from DataFrame or dict.
rpeaks, desired_length = _signal_formatpeaks_sanitize(rpeaks, desired_length=desired_length)
rr = np.ediff1d(rpeaks, to_begin=0) / sampling_rate
# The rate corresponding to the first peak is set to the mean RR.
rr[0] = np.mean(rr[1:])
rate = 60 / rr
if desired_length is not None:
rate = signal_interpolate(rpeaks, rate, desired_length=desired_length,
method=interpolation_method)
return rate
def _rsp_fixpeaks_retrieve(peaks, troughs=None, desired_length=None):
# Format input.
original_input = peaks
peaks, desired_length = _signal_formatpeaks_sanitize(original_input, desired_length, key="Peaks")
if troughs is None:
troughs, _ = _signal_formatpeaks_sanitize(original_input, desired_length, key="Troughs")
return peaks, troughs, desired_length
def _eda_fixpeaks_retrieve(peaks, onsets=None, height=None, desired_length=None):
# Format input.
original_input = peaks
peaks, desired_length = _signal_formatpeaks_sanitize(original_input, desired_length, key="Peaks")
if onsets is None:
onsets, _ = _signal_formatpeaks_sanitize(original_input, desired_length, key="Onsets")
if height is None:
height, _ = _signal_formatpeaks_sanitize(original_input, desired_length, key="Height")
return peaks, onsets, height, desired_length