Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _hrv_rsa_pb(ecg_period, sampling_rate, continuous=False):
"""Porges-Bohrer method."""
if continuous is True:
return None
# Re-sample at 2 Hz
resampled = signal_resample(ecg_period, sampling_rate=sampling_rate, desired_sampling_rate=2)
# Fit 21-point cubic polynomial filter (zero mean, 3rd order)
# with a low-pass cutoff frequency of 0.095Hz
trend = signal_filter(
resampled, sampling_rate=2, lowcut=0.095, highcut=None, method="savgol", order=3, window_size=21
)
zero_mean = resampled - trend
# Remove variance outside bandwidth of spontaneous respiration
zero_mean_filtered = signal_filter(zero_mean, sampling_rate=2, lowcut=0.12, highcut=0.40)
# Divide into 30-second epochs
time = np.arange(0, len(zero_mean_filtered)) / 2
time = pd.DataFrame({"Epoch Index": time // 30, "Signal": zero_mean_filtered})
time = time.set_index("Epoch Index")
for quiet in range(n_quiet):
quiets += [list(np.random.uniform(-0.05, 0.05, size=int(1000 * duration_quiet)) + 0.08)]
# Merge the two
emg = []
for i in range(len(quiets)):
emg += quiets[i]
if i < len(bursts):
emg += bursts[i]
emg = np.array(emg)
# Add random (gaussian distributed) noise
emg += np.random.normal(0, noise, len(emg))
# Resample
emg = signal_resample(emg, sampling_rate=1000, desired_length=length, desired_sampling_rate=sampling_rate)
return emg
cardiac = scipy.signal.wavelets.daub(10)
# Add the gap after the pqrst when the heart is resting.
cardiac = np.concatenate([cardiac, np.zeros(10)])
# Caculate the number of beats in capture time period
num_heart_beats = int(duration * heart_rate / 60)
# Concatenate together the number of heart beats needed
ecg = np.tile(cardiac, num_heart_beats)
# Change amplitude
ecg = ecg * 10
# Resample
ecg = signal_resample(
ecg, sampling_rate=int(len(ecg) / 10), desired_length=length, desired_sampling_rate=sampling_rate
)
return ecg
fhi = 0.25
flostd = 0.01
fhistd = 0.01
fid = 1
# Calculate time scales for rr and total output
sfrr = 1
trr = 1/sfrr
tstep = 1/sfecg
rrmean = 60/hrmean
n = 2**(np.ceil(np.log2(N*rrmean/trr)))
rr0 = _ecg_simulate_rrprocess(flo, fhi, flostd, fhistd, lfhfratio, hrmean, hrstd, sfrr, n)
# Upsample rr time series from 1 Hz to sfint Hz
rr = signal_resample(rr0, sampling_rate=1, desired_sampling_rate=sfint)
# Make the rrn time series
dt = 1/sfint
rrn = np.zeros(len(rr))
tecg = 0
i = 0
while i < len(rr):
tecg = tecg+rr[i]
ip = int(np.round(tecg/dt))
rrn[i:ip] = rr[i]
i = ip
Nt = ip
# Integrate system using fourth order Runge-Kutta
x0 = np.array([1, 0, 0.04])
The cleaned ECG channel as returned by `ecg_clean()`.
rpeaks : Union[list, np.array, pd.Series]
The samples at which R-peaks occur. Accessible with the key "ECG_R_Peaks" in the info dictionary
returned by `ecg_findpeaks()`.
sampling_rate : int
The sampling frequency of `ecg_signal` (in Hz, i.e., samples/second).
analysis_sampling_rate : int
The sampling frequency for analysis (in Hz, i.e., samples/second).
Returns
--------
dict
Dictionary of the points.
"""
ecg = signal_resample(ecg, sampling_rate=sampling_rate, desired_sampling_rate=analysis_sampling_rate)
dwtmatr = _dwt_compute_multiscales(ecg, 9)
# # only for debugging
# for idx in [0, 1, 2, 3]:
# plt.plot(dwtmatr[idx + 3], label=f'W[{idx}]')
# plt.plot(ecg, '--')
# plt.legend()
# plt.grid(True)
# plt.show()
rpeaks_resampled = _dwt_resample_points(rpeaks, sampling_rate, analysis_sampling_rate)
tpeaks, ppeaks = _dwt_delineate_tp_peaks(ecg, rpeaks_resampled, dwtmatr, sampling_rate=analysis_sampling_rate)
qrs_onsets, qrs_offsets = _dwt_delineate_qrs_bounds(
rpeaks_resampled, dwtmatr, ppeaks, tpeaks, sampling_rate=analysis_sampling_rate
)
ponsets, poffsets = _dwt_delineate_tp_onsets_offsets(ppeaks, dwtmatr, sampling_rate=analysis_sampling_rate)
for channel in file.named_channels:
freq_list.append(file.named_channels[channel].samples_per_second)
sampling_rate = np.max(freq_list)
# Loop through channels
data = {}
for channel in file.named_channels:
signal = np.array(file.named_channels[channel].data)
# Fill signal interruptions
if impute_missing is True and np.isnan(np.sum(signal)):
signal = pd.Series(signal).fillna(method="pad").values
# Resample if necessary
if file.named_channels[channel].samples_per_second != sampling_rate:
signal = signal_resample(
signal,
sampling_rate=file.named_channels[channel].samples_per_second,
desired_sampling_rate=sampling_rate,
method=resample_method,
)
data[channel] = signal
# Sanitize lengths
lengths = []
for channel in data:
lengths += [len(data[channel])]
if len(set(lengths)) > 1: # If different lengths
length = pd.Series(lengths).mode()[0] # Find most common (target length)
for channel in data:
if len(data[channel]) > length:
data[channel] = data[channel][0:length]
# flo and fhi correspond to the Mayer waves and respiratory rate respectively
flo = 0.1
fhi = 0.25
flostd = 0.01
fhistd = 0.01
# Calculate time scales for rr and total output
sfrr = 1
trr = 1 / sfrr
rrmean = 60 / hrmean
n = 2 ** (np.ceil(np.log2(N * rrmean / trr)))
rr0 = _ecg_simulate_rrprocess(flo, fhi, flostd, fhistd, lfhfratio, hrmean, hrstd, sfrr, n)
# Upsample rr time series from 1 Hz to sfint Hz
rr = signal_resample(rr0, sampling_rate=1, desired_sampling_rate=sfint)
# Make the rrn time series
dt = 1 / sfint
rrn = np.zeros(len(rr))
tecg = 0
i = 0
while i < len(rr):
tecg += rr[i]
ip = int(np.round(tecg / dt))
rrn[i:ip] = rr[i]
i = ip
Nt = ip
# Integrate system using fourth order Runge-Kutta
x0 = np.array([1, 0, 0.04])
def _eda_sympathetic_ghiasi(eda_signal, sampling_rate=1000, frequency_band=[0.045, 0.25], show=True, out={}):
min_frequency = frequency_band[0]
max_frequency = frequency_band[1]
# Downsample, normalize, filter
desired_sampling_rate = 50
downsampled = signal_resample(eda_signal, sampling_rate=sampling_rate, desired_sampling_rate=desired_sampling_rate)
normalized = standardize(downsampled)
filtered = signal_filter(normalized, sampling_rate=desired_sampling_rate, lowcut=0.01, highcut=0.5, method='butterworth')
# Divide the signal into segments and obtain the timefrequency representation
nperseg = int((5 / min_frequency) * desired_sampling_rate)
overlap = 59 / 50 # overlap of 59s in samples
frequency, time, bins = scipy.signal.spectrogram(filtered, fs=desired_sampling_rate, window='blackman',
nperseg=nperseg, noverlap=overlap)
lower_bound = len(frequency) - len(frequency[frequency > min_frequency])
f = frequency[(frequency > min_frequency) & (frequency < max_frequency)]
z = bins[lower_bound:lower_bound + len(f)]
# Visualization
if show is True:
fig = plt.figure()