Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
------
TypeError
If no input data for 'nni', 'rpeaks' or 'signal' is provided
Notes
-----
.. NN intervals are derived from the ECG signal if 'signal' is provided.
.. If both 'nni' and 'rpeaks' are provided, 'rpeaks' will be chosen over the 'nn' and the 'nni' data will be computed
from the 'rpeaks'.
.. If both 'nni' and 'signal' are provided, 'nni' will be chosen over 'signal'.
.. If both 'rpeaks' and 'signal' are provided, 'rpeaks' will be chosen over 'signal'.
"""
# Check input
if signal is not None:
rpeaks = biosppy.signals.ecg.ecg(signal=signal, sampling_rate=sampling_rate, show=False)[2]
elif nni is None and rpeaks is None:
raise TypeError('No input data provided. Please specify input data.')
# Get NNI series
nni = check_input(nni, rpeaks)
# Time vector back to ms
t = np.cumsum(nni) / 1000.
# Configure interval of visualized signal
interval = check_interval(interval, limits=[0, t[-1]], default=[0, 10])
# Prepare figure
if figsize is None:
figsize = (12, 4)
fig = plt.figure(figsize=figsize)
t = np.cumsum(nn)
t -= t[0]
# Compute PSD according to the Lomb-Scargle method
# Specify frequency grid
frequencies = np.linspace(0, 0.41, nfft)
# Compute angular frequencies
a_frequencies = np.asarray(2 * np.pi / frequencies)
powers = np.asarray(lombscargle(t, nn, a_frequencies, normalize=True))
# Fix power = inf at f=0
powers[0] = 2
# Apply moving average filter
if ma_size is not None:
powers = biosppy.signals.tools.smoother(powers, size=ma_size)['signal']
# Define metadata
meta = utils.ReturnTuple((nfft, ma_size, ), ('lomb_nfft', 'lomb_ma'))
if mode not in ['normal', 'dev', 'devplot']:
warnings.warn("Unknown mode '%s'. Will proceed with 'normal' mode." % mode, stacklevel=2)
mode = 'normal'
# Normal Mode:
# Returns frequency parameters, PSD plot figure and no frequency & power series/arrays
if mode == 'normal':
# ms^2 to s^2
powers = powers * 10 ** 6
# Compute frequency parameters
params, freq_i = _compute_parameters('lomb', frequencies, powers, fbands)
# Frequency Domain Features
# ==========================
freq_bands = {
"ULF": [0.0001, 0.0033],
"VLF": [0.0033, 0.04],
"LF": [0.04, 0.15],
"HF": [0.15, 0.40],
"VHF": [0.4, 0.5]}
# Frequency-Domain Power over time
freq_powers = {}
for band in freq_bands:
freqs = freq_bands[band]
# Filter to keep only the band of interest
filtered, sampling_rate, params = biosppy.signals.tools.filter_signal(signal=RRi, ftype='butter', band='bandpass', order=1, frequency=freqs, sampling_rate=sampling_rate)
# Apply Hilbert transform
amplitude, phase = biosppy.signals.tools.analytic_signal(filtered)
# Extract Amplitude of Envelope (power)
freq_powers["ECG_HRV_" + band] = amplitude
freq_powers = pd.DataFrame.from_dict(freq_powers)
freq_powers.index = hrv["df"].index
hrv["df"] = pd.concat([hrv["df"], freq_powers], axis=1)
# Compute Power Spectral Density (PSD) using multitaper method
power, freq = mne.time_frequency.psd_array_multitaper(RRi, sfreq=sampling_rate, fmin=0, fmax=0.5, adaptive=False, normalization='length')
def power_in_band(power, freq, band):
power = np.trapz(y=power[(freq >= band[0]) & (freq < band[1])], x=freq[(freq >= band[0]) & (freq < band[1])])
return(power)
in the successive segment N+1. In this case, use the 'overlap' parameter to select if the first element of the
segment should be dropped or not:
.. If True: overlap allowed, returns all NNI but the cumulative sum of the NNI in a segment can be greater
than the specified duration.
.. If False: no overlap allowed, first NNI will be dropped and the cumulative sum of the NNI in a segment
will always be < specified duration.
Raises
------
TypeError
If no input data for 'signal', 'nn' and 'rpeaks' provided.
"""
# Check input
if signal is not None:
signal, rpeaks = biosppy.signals.ecg.ecg(signal=signal, sampling_rate=sampling_rate, show=False)[1:3]
elif nni is None and rpeaks is None:
raise TypeError('No input data provided. Please specify input data.')
nn = tools.check_input(nni, rpeaks)
version = utils.ReturnTuple(('v.' + pyhrv.__version__, ), ('version', ))
# COMPUTE TIME DOMAIN PARAMETERS
# Check for kwargs for the 'kwargs_time'
if kwargs_time is not None:
if type(kwargs_time) is not dict:
raise TypeError("Expected , got %s: 'kwargs_time' must be a dictionary containing "
"parameters (keys) and values for the 'time_domain()' function." % type(kwargs_time))
# Supported kwargs
available_kwargs = ['threshold', 'binsize', 'plot']
- Dominique Makowski (https://github.com/DominiqueMakowski)
*Dependencies*
- biosppy
- numpy
- pandas
*See Also*
- BioSPPY: https://github.com/PIA-Group/BioSPPy
"""
processed_rsp = {"df": pd.DataFrame({"RSP_Raw": np.array(rsp)})}
biosppy_rsp = dict(biosppy.signals.resp.resp(rsp, sampling_rate=sampling_rate, show=False))
processed_rsp["df"]["RSP_Filtered"] = biosppy_rsp["filtered"]
# RSP Rate
# ============
rsp_rate = biosppy_rsp["resp_rate"]*60 # Get RSP rate value (in cycles per minute)
rsp_times = biosppy_rsp["resp_rate_ts"] # the time (in sec) of each rsp rate value
rsp_times = np.round(rsp_times*sampling_rate).astype(int) # Convert to timepoints
try:
rsp_rate = interpolate(rsp_rate, rsp_times, sampling_rate) # Interpolation using 3rd order spline
processed_rsp["df"]["RSP_Rate"] = rsp_rate
except TypeError:
print("NeuroKit Warning: rsp_process(): Sequence too short to compute respiratory rate.")
processed_rsp["df"]["RSP_Rate"] = np.nan
freq_bands = {
"ULF": [0.0001, 0.0033],
"VLF": [0.0033, 0.04],
"LF": [0.04, 0.15],
"HF": [0.15, 0.40],
"VHF": [0.4, 0.5]}
# Frequency-Domain Power over time
freq_powers = {}
for band in freq_bands:
freqs = freq_bands[band]
# Filter to keep only the band of interest
filtered, sampling_rate, params = biosppy.signals.tools.filter_signal(signal=RRi, ftype='butter', band='bandpass', order=1, frequency=freqs, sampling_rate=sampling_rate)
# Apply Hilbert transform
amplitude, phase = biosppy.signals.tools.analytic_signal(filtered)
# Extract Amplitude of Envelope (power)
freq_powers["ECG_HRV_" + band] = amplitude
freq_powers = pd.DataFrame.from_dict(freq_powers)
freq_powers.index = hrv["df"].index
hrv["df"] = pd.concat([hrv["df"], freq_powers], axis=1)
# Compute Power Spectral Density (PSD) using multitaper method
power, freq = mne.time_frequency.psd_array_multitaper(RRi, sfreq=sampling_rate, fmin=0, fmax=0.5, adaptive=False, normalization='length')
def power_in_band(power, freq, band):
power = np.trapz(y=power[(freq >= band[0]) & (freq < band[1])], x=freq[(freq >= band[0]) & (freq < band[1])])
return(power)
# Extract Power according to frequency bands
- Greco et al. (2015): http://ieeexplore.ieee.org/abstract/document/7229284/?reload=true
"""
eda_df = pd.DataFrame({"EDA_Raw": np.array(eda)})
# Convex optimization
if use_cvxEDA is True:
try:
eda = cvxeda(eda, sampling_rate=sampling_rate)
eda_df["EDA_Phasic"] = eda
except:
print("NeuroKit Warning: couln't apply cvxEDA on EDA signal. Using normal.")
# Compute several features using biosppy
biosppy_eda = dict(biosppy.signals.eda.eda(eda, sampling_rate=sampling_rate, show=False))
eda_df["EDA_Filtered"] = biosppy_eda["filtered"]
# Store SCR onsets
scr_onsets = np.array([np.nan]*len(eda))
scr_onsets[biosppy_eda['onsets']] = 1
eda_df["SCR_Onsets"] = scr_onsets
# Store SCR peaks and amplitudes
scr_peaks = np.array([np.nan]*len(eda))
peak_index = 0
for index in range(len(scr_peaks)):
try:
if index == biosppy_eda["peaks"][peak_index]:
scr_peaks[index] = biosppy_eda['amplitudes'][peak_index]
peak_index += 1
ax.yaxis.grid(which='minor', color='salmon', lw=0.3)
ax.set_yticks(y_major)
ax.yaxis.grid(which='major', color='r', lw=0.7)
# Add legend
unit = '' if unit == '-' else unit
text_ = 'Division (x): %is\nDivision (y): %.1f%s' % (n, (np.abs(y_major[1] - y_major[0])), unit)
ax.text(0.88, 0.85, text_, transform=ax.transAxes, fontsize=9,
bbox=dict(boxstyle='round', facecolor='white', alpha=0.9))
# Plot ECG signal
ax.plot(t, signal, 'r')
fig_ecg.tight_layout()
# Plot r-peaks
rps = biosppy.signals.ecg.ecg(signal=signal, sampling_rate=sampling_rate, show=False)[2]
p = [float(signal[x]) for x in rps]
r = t[rps]
if rpeaks:
ax.plot(r, p, 'g*', alpha=0.7)
# Add title
if title is not None:
ax.set_title('ECG Signal - %s' % str(title))
else:
ax.set_title('ECG Signal')
# Show plot
if show:
plt.show()
# Output