Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if file.endswith(".dat"):
dat_files.append(file)
mit_records = [w.replace(".dat", "") for w in dat_files]
results = np.zeros((len(mit_records), 5), dtype=int)
i = 0
for record in mit_records:
progress = int(i/float(len(mit_records))*100.0)
print("MITDB progress: %i%%" % progress)
sig, fields = wfdb.rdsamp(self.mitdb_dir+'/'+record)
unfiltered_ecg = sig[:, 0]
ann = wfdb.rdann(str(self.mitdb_dir+'/'+record), 'atr')
anno = _tester_utils.sort_MIT_annotations(ann)
r_peaks = detector(unfiltered_ecg)
delay = _tester_utils.calcMedianDelay(r_peaks, unfiltered_ecg, max_delay_in_samples)
if delay > 1:
TP, FP, FN = _tester_utils.evaluate_detector(r_peaks, anno, delay, tol=tolerance)
TN = len(unfiltered_ecg)-(TP+FP+FN)
results[i, 0] = int(record)
results[i, 1] = TP
results[i, 2] = FP
results[i, 3] = FN
results[i, 4] = TN
sampto = None#int(60. * 1 * 360)
sampfrom = 0#int(15. * 360)
sensitivity = []
precision = []
for subject in zip(records, annotations):
#for i in selection:
#
# subject = zip(records, annotations)[i]
print('processing subject {}'.format(subject[1][-7:-4]))
data = wfdb.rdrecord(subject[0][:-4], sampto=sampto)
annotation = wfdb.rdann(subject[1][:-4], 'atr',
sampfrom=sampfrom,
sampto=sampto)
sfreq = data.fs
ecg = data.p_signal[:, 0]
manupeaks = annotation.sample
#algopeaks = peaks_signal(ecg, sfreq)
algopeaks = rpeaks(ecg, sfreq)
# tolerance for match between algorithmic and manual annotation (in sec)
tolerance = 0.05
comparitor = compare_annotations(manupeaks, algopeaks,
int(np.rint(tolerance * sfreq)))
tp = comparitor.tp
fp = comparitor.fp
def make_dataset(records, width, savepath):
""" Inside an array """
# Prepare containers
signals, labels = [], []
# Iterate files
for path in records:
print 'Processing file:', path
record = wf.rdsamp(path)
annotations = wf.rdann(path, 'atr')
# Extract pure signals
data = record.p_signals
# Convert each channel into labeled fragments
signal, label = convert_data(data, annotations, width)
# Cumulate
signals.append(signal)
labels.append(label)
# Convert to one huge numpy.array
signals = np.vstack(signals)
labels = np.vstack(labels)
# Write to disk
def read_file(file, participant):
"""Utility function
"""
# Get signal
data = pd.DataFrame({"ECG": wfdb.rdsamp(file[:-4])[0][:, 0]})
data["Participant"] = "MIT-Arrhythmia_%.2i" %(participant)
data["Sample"] = range(len(data))
data["Sampling_Rate"] = 360
data["Database"] = "MIT-Arrhythmia-x" if "x_mitdb" in file else "MIT-Arrhythmia"
# getting annotations
anno = wfdb.rdann(file[:-4], 'atr')
anno = np.unique(anno.sample[np.in1d(anno.symbol, ['N', 'L', 'R', 'B', 'A', 'a', 'J', 'S', 'V', 'r', 'F', 'e', 'j', 'n', 'E', '/', 'f', 'Q', '?'])])
anno = pd.DataFrame({"Rpeaks": anno})
anno["Participant"] = "MIT-Arrhythmia_%.2i" %(participant)
anno["Sampling_Rate"] = 360
anno["Database"] = "MIT-Arrhythmia-x" if "x_mitdb" in file else "MIT-Arrhythmia"
return data, anno
def show_path(path):
""" As a plot """
# Read in the data
record = wf.rdsamp(path)
annotation = wf.rdann(path, 'atr')
data = record.p_signals
cha = data[:, 0]
print 'Channel type:', record.signame[0]
times = np.arange(len(cha), dtype = float)
times /= record.fs
plt.plot(times, cha)
plt.xlabel('Time [s]')
plt.show()
def _get_sections(self):
"""Collect continuous arrhythmia sections."""
# Empty dictionary for arrhythmia sections
sections = list()
# Loop through records
for record_id in self.record_ids:
# Import recording
record = wfdb.rdrecord(os.path.join(self.raw_path, record_id))
# Import annotations
annotation = wfdb.rdann(os.path.join(self.raw_path, record_id), 'atr')
# Get sample frequency
fs = record.__dict__['fs']
# Get waveform
waveform = record.__dict__['p_signal']
# labels
labels = [label[1:] for label in annotation.__dict__['aux_note']]
# Samples
sample = annotation.__dict__['sample']
# Loop through labels and collect sections
for idx, label in enumerate(labels):
def show_annotations(path):
""" Exemplary code """
record = wf.rdsamp(path)
annotation = wf.rdann(path, 'atr')
# Get data and annotations for the first 2000 samples
howmany = 2000
channel = record.p_signals[:howmany, 0]
# Extract all of the annotation related infromation
where = annotation.annsamp < howmany
samp = annotation.annsamp[where]
# Convert to numpy.array to get fancy indexing access
types = np.array(annotation.anntype)
types = types[where]
times = np.arange(howmany, dtype = 'float') / record.fs
plt.plot(times, channel)