Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
raise ImportError(
"NeuroKit error: ecg_delineator(): the 'PyWavelets' module is required for this method to run. ",
"Please install it first (`pip install PyWavelets`).",
)
# first derivative of the Gaissian signal
scales = np.array([1, 2, 4, 8, 16])
cwtmatr, __ = pywt.cwt(ecg, scales, "gaus1", sampling_period=1.0 / sampling_rate)
max_search_duration = 0.05
tppeaks = []
for index_cur, index_next in zip(keep_tp[:-1], keep_tp[1:]):
# limit 1
correct_sign = cwtmatr[4, :][index_cur] < 0 and cwtmatr[4, :][index_next] > 0 # pylint: disable=R1716
# near = (index_next - index_cur) < max_wv_peak_dist #limit 2
# if near and correct_sign:
if correct_sign:
index_zero_cr = signal_zerocrossings(cwtmatr[4, :][index_cur:index_next])[0] + index_cur
nb_idx = int(max_search_duration * sampling_rate)
index_max = np.argmax(ecg[index_zero_cr - nb_idx : index_zero_cr + nb_idx]) + (index_zero_cr - nb_idx)
tppeaks.append(index_max)
return tppeaks
References
----------
- Kim, K. H., Bang, S. W., & Kim, S. R. (2004). Emotion recognition system using short-term monitoring
of physiological signals. Medical and biological engineering and computing, 42(3), 419-427.
"""
# differentiation
df = np.diff(eda_phasic)
# smooth
df = signal_smooth(signal=df, kernel="bartlett", size=int(sampling_rate))
# zero crosses
zeros = signal_zerocrossings(df)
if np.all(df[: zeros[0]] > 0):
zeros = zeros[1:]
if np.all(df[zeros[-1] :] > 0):
zeros = zeros[:-1]
# exclude SCRs with small amplitude
thr = amplitude_min * np.max(df)
scrs, amps, ZC, pks = [], [], [], []
for i in range(0, len(zeros) - 1, 2):
scrs += [df[zeros[i] : zeros[i + 1]]]
aux = scrs[-1].max()
if aux > thr:
amps += [aux]
ZC += [zeros[i]]
ZC += [zeros[i + 1]]
def _embedding_delay_select(metric_values, algorithm="first local minimum"):
if algorithm == "first local minimum":
optimal = signal_findpeaks(-1 * metric_values, relative_height_min=0.1, relative_max=True)["Peaks"][0]
elif algorithm == "first 1/e crossing":
metric_values = metric_values - 1 / np.exp(1)
optimal = signal_zerocrossings(metric_values)[0]
elif algorithm == "first zero crossing":
optimal = signal_zerocrossings(metric_values)[0]
elif algorithm == "closest to 40% of the slope":
slope = np.diff(metric_values) * len(metric_values)
slope_in_deg = np.rad2deg(np.arctan(slope))
optimal = np.where(slope_in_deg == find_closest(40, slope_in_deg))[0][0]
return optimal
def _hrv_nonlinear_fragmentation(rri, out):
"""Heart Rate Fragmentation Indices - Costa (2017)
The more fragmented a time series is, the higher the PIP, IALS, PSS, and PAS indices will be.
"""
diff_rri = np.diff(rri)
zerocrossings = signal_zerocrossings(diff_rri)
# Percentage of inflection points (PIP)
out["PIP"] = len(zerocrossings) / len(rri)
# Inverse of the average length of the acceleration/deceleration segments (IALS)
accelerations = np.where(diff_rri > 0)[0]
decelerations = np.where(diff_rri < 0)[0]
consecutive = np.concatenate([find_consecutive(accelerations), find_consecutive(decelerations)])
lengths = [len(i) for i in consecutive]
out["IALS"] = 1 / np.average(lengths)
# Percentage of short segments (PSS) - The complement of the percentage of NN intervals in
# acceleration and deceleration segments with three or more NN intervals
out["PSS"] = np.sum(np.asarray(lengths) < 3) / len(lengths)
# Percentage of NN intervals in alternation segments (PAS). An alternation segment is a sequence
max_value = epochs[i].Signal.max()
if np.all(-0.51 < t < -0.3):
# Trim start of epoch
epochs[i] = epochs[i][-0.3:0.5]
max_value = epochs[i].Signal.max()
# Find position of peak
max_frame = epochs[i]["Index"].loc[epochs[i]["Signal"] == max_value]
max_frame = np.array(max_frame)
if len(max_frame) > 1:
max_frame = max_frame[0] # If two points achieve max value, first one is blink
else:
max_frame = int(max_frame)
# left and right zero markers
crossings = signal_zerocrossings(epochs[i].Signal)
crossings_idx = epochs[i]["Index"].iloc[crossings]
crossings_idx = np.sort(np.append([np.array(crossings_idx)], [max_frame]))
max_position = int(np.where(crossings_idx == max_frame)[0])
if (max_position - 1) >= 0: # crosses zero point
leftzero = crossings_idx[max_position - 1]
else:
max_value_t = epochs[i].Signal.idxmax()
sliced_before = epochs[i].loc[slice(max_value_t), :]
leftzero = sliced_before["Index"].loc[sliced_before["Signal"] == sliced_before["Signal"].min()]
leftzero = int(np.array(leftzero))
if (max_position + 1) < len(crossings_idx): # crosses zero point
rightzero = crossings_idx[max_position + 1]
else:
max_value_t = epochs[i].Signal.idxmax()
max_value = epochs[i].Signal.max()
if np.all(-0.51 < t < -0.3):
# Trim start of epoch
epochs[i] = epochs[i][-0.3:0.5]
max_value = epochs[i].Signal.max()
# Find position of peak
max_frame = epochs[i]["Index"].loc[epochs[i]["Signal"] == max_value]
max_frame = np.array(max_frame)
if len(max_frame) > 1:
max_frame = max_frame[0] # If two points achieve max value, first one is blink
else:
max_frame = int(max_frame)
# left and right zero markers
crossings = signal_zerocrossings(epochs[i].Signal)
crossings_idx = epochs[i]["Index"].iloc[crossings]
crossings_idx = np.sort(np.append([np.array(crossings_idx)], [max_frame]))
max_position = int(np.where(crossings_idx == max_frame)[0])
if (max_position - 1) >= 0: # crosses zero point
leftzero = crossings_idx[max_position - 1]
else:
max_value_t = epochs[i].Signal.idxmax()
sliced_before = epochs[i].loc[slice(max_value_t), :]
leftzero = sliced_before["Index"].loc[sliced_before["Signal"] == sliced_before["Signal"].min()]
leftzero = np.array(leftzero)
if (max_position + 1) < len(crossings_idx): # crosses zero point
rightzero = crossings_idx[max_position + 1]
else:
max_value_t = epochs[i].Signal.idxmax()
epsilon_1 = np.sqrt(np.mean(np.square(signal_1)))
peaks_1, _ = scipy.signal.find_peaks(np.abs(signal_1), height=epsilon_1)
# Keep only peaks_1 that are nearest to peaks_2
peaks_1_keep = np.zeros_like(peaks_4)
for i in range(len(peaks_4)):
peaks_distance = abs(peaks_2_keep[i] - peaks_1)
peaks_1_keep[i] = peaks_1[np.argmin(peaks_distance)]
# Find R peaks
max_R_peak_dist = int(0.1 * sampling_rate)
rpeaks = []
for index_cur, index_next in zip(peaks_1_keep[:-1], peaks_1_keep[1:]):
correct_sign = signal_1[index_cur] < 0 and signal_1[index_next] > 0 # pylint: disable=R1716
near = (index_next - index_cur) < max_R_peak_dist # limit 2
if near and correct_sign:
rpeaks.append(signal_zerocrossings(signal_1[index_cur:index_next])[0] + index_cur)
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
ppeaks.append(np.nan)
continue
ecg_local = ecg[srch_idx_start:srch_idx_end]
peaks, __ = scipy.signal.find_peaks(np.abs(dwt_local), height=height)
peaks = list(filter(lambda p: np.abs(dwt_local[p]) > 0.025 * max(dwt_local), peaks))
if dwt_local[0] > 0: # just append
peaks = [0] + peaks
# detect morphology
candidate_peaks = []
candidate_peaks_scores = []
for idx_peak, idx_peak_nxt in zip(peaks[:-1], peaks[1:]):
correct_sign = dwt_local[idx_peak] > 0 and dwt_local[idx_peak_nxt] < 0 # pylint: disable=R1716
if correct_sign:
idx_zero = signal_zerocrossings(dwt_local[idx_peak:idx_peak_nxt])[0] + idx_peak
# This is the score assigned to each peak. The peak with the highest score will be
# selected.
score = ecg_local[idx_zero] - abs(
float(idx_zero) / sampling_rate - p2r_duration
) # Minus p2r because of the srch_idx_start
candidate_peaks.append(idx_zero)
candidate_peaks_scores.append(score)
if not candidate_peaks:
ppeaks.append(np.nan)
continue
ppeaks.append(candidate_peaks[np.argmax(candidate_peaks_scores)] + srch_idx_start)
return tpeaks, ppeaks