Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import matplotlib.pyplot as plt
from scipy.stats.stats import pearsonr
from astropy.stats import LombScargle
#Getting the data
f = open("sampleDataEcgEda.txt", "r")
dataEcg = []
for line in f:
lineValues = line.split(",")
dataEcg.append(float(lineValues[1]))
#Perform QRS detection
ecgOut = ecg.ecg(signal=dataEcg, sampling_rate=1000., show=False)
#Calculate RR Tachogram
rPeaks = ecgOut[2]
rrTachogram = []
prevPeak = rPeaks[0]
for peak in rPeaks[1:(len(rPeaks))]:
rrTachogram.append(peak - prevPeak)
prevPeak = peak
#Calculate median heartbeat template
templatesForCorrCoef = ecgOut[4]
templates = templatesForCorrCoef
medianTemplate = [x / len(templates) for x in [sum(x) for x in zip(*templates)]]
#Calculate correlation coeffcients
# 然后使用说明文档中的参数名(这里是rpeaks)作为key取值。
rpeaks_indices_2 = rpeaks_indices_2["rpeaks"]
logging.info("完成. 结果类型为 " + str(type(rpeaks_indices_2)))
# 检验两种方法得到的结果是否相同:
check_sum = np.sum(rpeaks_indices_1 == rpeaks_indices_2)
if check_sum == len(rpeaks_indices_1):
logging.info("两种取值方式结果相同 ... ")
else:
logging.info("两种取值方式结果不同,退出 ...")
sys.exit(1)
# 与 christov_segmenter 接口一致的还有 hamilton_segmenter
logging.info("调用接口一致的 hamilton_segmenter 进行R波检测")
tic = time.time()
rpeaks = ecg.hamilton_segmenter(signal, sampling_rate=fs)
toc = time.time()
logging.info("完成. 用时: %f 秒. " % (toc - tic))
rpeaks_indices_3 = rpeaks.as_dict()["rpeaks"]
# 绘波形图和R波位置
num_plot_samples = 3600
logging.info("绘制波形图和检测的R波位置 ...")
sig_plot = signal[:num_plot_samples]
rpeaks_plot_1 = rpeaks_indices_1[rpeaks_indices_1 <= num_plot_samples]
plt.figure()
plt.plot(sig_plot, "g", label="ECG")
plt.grid(True)
plt.plot(rpeaks_plot_1, sig_plot[rpeaks_plot_1], "ro", label="christov_segmenter")
rpeaks_plot_3 = rpeaks_indices_3[rpeaks_indices_3 <= num_plot_samples]
plt.plot(rpeaks_plot_3, sig_plot[rpeaks_plot_3], "b^", label="hamilton_segmenter")
plt.legend()
plt.title(data_path)
def _get_rpeaks(self):
"""Hamilton-Tompkins r-peak detection."""
# Get BioSPPy ecg object
ecg_object = ecg.ecg(signal=self.waveform, sampling_rate=self.fs, show=False)
return ecg_object['rpeaks']
ax1.tick_params(labelbottom='off')
ax1.yaxis.set_tick_params(labelsize=16)
# Plot CAM
ax2.plot(time_series_filt_ts, cam_filt, '-k', lw=1.5)
# Axes labels
ax2.set_xlabel('Time, seconds', fontsize=22)
ax2.set_ylabel('Class Activation Map', fontsize=22)
ax2.set_xlim([0, time_series_filt_ts.max()])
ax2.set_ylim([cam_filt.min()-0.05, cam_filt.max()+0.05])
ax2.xaxis.set_tick_params(labelsize=16)
ax2.yaxis.set_tick_params(labelsize=16)
# Get ecg object
ecg_object = ecg.ecg(time_series_filt, sampling_rate=fs, show=False)
# Get waveform templates
templates, _ = _get_templates(time_series_filt, ecg_object['rpeaks'], 0.4, 0.6, fs)
cam_filt, _, _ = filter_signal(signal=cam_filt,
ftype='FIR',
band='bandpass',
order=int(0.3 * fs),
frequency=[3, 100],
sampling_rate=fs)
# Get cam templates
cam_templates, _ = _get_templates(cam_filt, ecg_object['rpeaks'], 0.4, 0.6, fs)
templates_ts = np.linspace(-250, 400, templates.shape[0], endpoint=False)
def _preprocess_signal(self, signal_raw, filter_bandwidth, normalize, polarity_check,
template_before, template_after):
# Filter signal
signal_filtered = self._apply_filter(signal_raw, filter_bandwidth)
# Get BioSPPy ECG object
ecg_object = ecg.ecg(signal=signal_raw, sampling_rate=self.fs, show=False)
# Get BioSPPy output
ts = ecg_object['ts'] # Signal time array
rpeaks = ecg_object['rpeaks'] # rpeak indices
# Get templates and template time array
templates, rpeaks = self._extract_templates(signal_filtered, rpeaks, template_before, template_after)
templates_ts = np.linspace(-template_before, template_after, templates.shape[1], endpoint=False)
# Polarity check
signal_raw, signal_filtered, templates = self._check_waveform_polarity(polarity_check=polarity_check,
signal_raw=signal_raw,
signal_filtered=signal_filtered,
templates=templates)
# Normalize waveform
signal_raw, signal_filtered, templates = self._normalize_waveform_amplitude(normalize=normalize,
def plotEcgSignal(file=f, delimiter="", positionInCsvFile=2):
return ecg.ecg(signal=getEcgDataFromFile(), sampling_rate=1000., show=True)