Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Uses the same defaults as `BioSPPy.
`_.
"""
# Parameters
order = 4
frequency = 5
frequency = 2 * np.array(frequency) / sampling_rate # Normalize frequency to Nyquist Frequency (Fs/2).
# Filtering
b, a = scipy.signal.butter(N=order, Wn=frequency, btype="lowpass", analog=False, output="ba")
filtered = scipy.signal.filtfilt(b, a, eda_signal)
# Smoothing
clean = signal_smooth(filtered, method="convolution", kernel="boxzen", size=int(0.75 * sampling_rate))
return clean
if show:
fig, (ax0, ax1) = plt.subplots(nrows=2, ncols=1, sharex=True)
ax0.plot(signal, label="filtered")
# Ignore the samples with negative amplitudes and square the samples with
# values larger than zero.
signal[signal < 0] = 0
sqrd = signal ** 2
# Compute the thresholds for peak detection. Call with show=True in order
# to visualize thresholds.
ma_peak_kernel = int(np.rint(peakwindow * sampling_rate))
ma_peak = signal_smooth(sqrd, kernel="boxcar", size=ma_peak_kernel)
ma_beat_kernel = int(np.rint(beatwindow * sampling_rate))
ma_beat = signal_smooth(sqrd, kernel="boxcar", size=ma_beat_kernel)
thr1 = ma_beat + beatoffset * np.mean(sqrd) # threshold 1
if show:
ax1.plot(sqrd, label="squared")
ax1.plot(thr1, label="threshold")
ax1.legend(loc="upper right")
# Identify start and end of PPG waves.
waves = ma_peak > thr1
beg_waves = np.where(np.logical_and(np.logical_not(waves[0:-1]), waves[1:]))[0]
end_waves = np.where(np.logical_and(waves[0:-1], np.logical_not(waves[1:])))[0]
# Throw out wave-ends that precede first wave-start.
end_waves = end_waves[end_waves > beg_waves[0]]
# Identify systolic peaks within waves (ignore waves that are too short).
def _eda_phasic_mediansmooth(eda_signal, sampling_rate=1000, smoothing_factor=4):
"""One of the two methods available in biopac's acqknowledge (https://www.biopac.com/knowledge-base/phasic-eda-issue/)
"""
size = smoothing_factor * sampling_rate
tonic = signal_smooth(eda_signal, kernel="median", size=size)
phasic = eda_signal - tonic
out = pd.DataFrame({"EDA_Tonic": np.array(tonic), "EDA_Phasic": np.array(phasic)})
return out
"""All tune-able parameters are specified as keyword arguments.
The `signal` must be the highpass-filtered raw ECG with a lowcut of .5 Hz.
"""
if show is True:
__, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True)
# Compute the ECG's gradient as well as the gradient threshold. Run with
# show=True in order to get an idea of the threshold.
grad = np.gradient(signal)
absgrad = np.abs(grad)
smooth_kernel = int(np.rint(smoothwindow * sampling_rate))
avg_kernel = int(np.rint(avgwindow * sampling_rate))
smoothgrad = signal_smooth(absgrad, kernel="boxcar", size=smooth_kernel)
avggrad = signal_smooth(smoothgrad, kernel="boxcar", size=avg_kernel)
gradthreshold = gradthreshweight * avggrad
mindelay = int(np.rint(sampling_rate * mindelay))
if show is True:
ax1.plot(signal)
ax2.plot(smoothgrad)
ax2.plot(gradthreshold)
# Identify start and end of QRS complexes.
qrs = smoothgrad > gradthreshold
beg_qrs = np.where(np.logical_and(np.logical_not(qrs[0:-1]), qrs[1:]))[0]
end_qrs = np.where(np.logical_and(qrs[0:-1], np.logical_not(qrs[1:])))[0]
# Throw out QRS-ends that precede first QRS-start.
end_qrs = end_qrs[end_qrs > beg_qrs[0]]
# Identify R-peaks within QRS (ignore QRS that are too short).
def _ecg_delineator_derivative_P_onset(rpeak, heartbeat, R, P):
if P is None:
return np.nan, None
segment = heartbeat.iloc[:P] # Select left of P wave
signal = signal_smooth(segment["Signal"].values, size=R/10)
signal = np.gradient(np.gradient(signal))
P_onset = np.argmax(signal)
from_R = R - P_onset # Relative to R
return rpeak - from_R
def _ecg_delineator_derivative_T_offset(rpeak, heartbeat, R, T):
if T is None:
return np.nan, None
segment = heartbeat.iloc[R + T:] # Select left of P wave
signal = signal_smooth(segment["Signal"].values, size=R/10)
signal = np.gradient(np.gradient(signal))
T_offset = np.argmax(signal)
return rpeak + T + T_offset
def _ecg_delineator_peak_T_offset(rpeak, heartbeat, R, T):
if T is None:
return np.nan
segment = heartbeat.iloc[R + T :] # Select left of P wave
try:
signal = signal_smooth(segment["Signal"].values, size=R / 10)
except TypeError:
signal = segment["Signal"]
if len(signal) < 2:
return np.nan
signal = np.gradient(np.gradient(signal))
T_offset = np.argmax(signal)
return rpeak + T + T_offset
def _ecg_delineator_peak_P_onset(rpeak, heartbeat, R, P):
if P is None:
return np.nan
segment = heartbeat.iloc[:P] # Select left of P wave
try:
signal = signal_smooth(segment["Signal"].values, size=R / 10)
except TypeError:
signal = segment["Signal"]
if len(signal) < 2:
return np.nan
signal = np.gradient(np.gradient(signal))
P_onset = np.argmax(signal)
from_R = R - P_onset # Relative to R
return rpeak - from_R
with a lowcut of .5 Hz, a highcut of 8 Hz.
"""
if show:
fig, (ax0, ax1) = plt.subplots(nrows=2, ncols=1, sharex=True)
ax0.plot(signal, label="filtered")
# Ignore the samples with negative amplitudes and square the samples with
# values larger than zero.
signal[signal < 0] = 0
sqrd = signal ** 2
# Compute the thresholds for peak detection. Call with show=True in order
# to visualize thresholds.
ma_peak_kernel = int(np.rint(peakwindow * sampling_rate))
ma_peak = signal_smooth(sqrd, kernel="boxcar", size=ma_peak_kernel)
ma_beat_kernel = int(np.rint(beatwindow * sampling_rate))
ma_beat = signal_smooth(sqrd, kernel="boxcar", size=ma_beat_kernel)
thr1 = ma_beat + beatoffset * np.mean(sqrd) # threshold 1
if show:
ax1.plot(sqrd, label="squared")
ax1.plot(thr1, label="threshold")
ax1.legend(loc="upper right")
# Identify start and end of PPG waves.
waves = ma_peak > thr1
beg_waves = np.where(np.logical_and(np.logical_not(waves[0:-1]), waves[1:]))[0]
end_waves = np.where(np.logical_and(waves[0:-1], np.logical_not(waves[1:])))[0]
# Throw out wave-ends that precede first wave-start.