Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
max_delay_in_samples = 350 / 5
dat_files = []
for file in os.listdir(self.mitdb_dir):
if file.endswith(".dat"):
dat_files.append(file)
mit_records = [w.replace(".dat", "") for w in dat_files]
results = np.zeros((len(mit_records), 5), dtype=int)
i = 0
for record in mit_records:
progress = int(i/float(len(mit_records))*100.0)
print("MITDB progress: %i%%" % progress)
sig, fields = wfdb.rdsamp(self.mitdb_dir+'/'+record)
unfiltered_ecg = sig[:, 0]
ann = wfdb.rdann(str(self.mitdb_dir+'/'+record), 'atr')
anno = _tester_utils.sort_MIT_annotations(ann)
r_peaks = detector(unfiltered_ecg)
delay = _tester_utils.calcMedianDelay(r_peaks, unfiltered_ecg, max_delay_in_samples)
if delay > 1:
TP, FP, FN = _tester_utils.evaluate_detector(r_peaks, anno, delay, tol=tolerance)
TN = len(unfiltered_ecg)-(TP+FP+FN)
results[i, 0] = int(record)
results[i, 1] = TP
- patient001/
- patient002/
- ...
"""
#==============================================================================
# Extracting
#==============================================================================
data={"Control": {}, "Patient": {}}
participants = [x for x in os.listdir("./data/") if 'patient' in x]
for participant in participants:
files = os.listdir("./data/" + participant)
if len([x for x in files if '.dat' in x]) > 0:
file = [x for x in files if '.dat' in x][0].split(".")[0]
signals, info = wfdb.rdsamp("data/" + participant + "/" + file)
signals = pd.DataFrame(signals)
signals.columns = info["sig_name"]
data_participant = {}
data_participant["Signals"] = signals
data_participant["sampling_rate"] = info["fs"]
for key in info["comments"]:
try:
data_participant[key.split(": ")[0]] = key.split(": ")[1]
except IndexError:
data_participant[key.split(":")[0]] = np.nan
if data_participant["Reason for admission"] in ["n/a", "Healthy control"]:
def load_data(signal_filename, label_filename):
'''
Load signal file and label file. One-hot encoding of labels.
Input: signal filename, label filename
Output: list of signals, numpy array of labels
'''
signals = []
records_file = open(signal_filename, 'r')
for record_name in records_file:
record = wfdb.rdsamp('data/training2017/{}'.format(record_name.strip()))
d_signal = record.adc()[:,0]
signals.append(d_signal)
labels = pd.read_csv(label_filename, header=None, names=['id', 'label'])
y = labels['label']
y[y=='N'] = 0
y[y=='A'] = 1
y[y=='O'] = 2
y[y=='~'] = 3
y = keras.utils.to_categorical(y)
return signals, y
def load_data(record_name):
'''
Load signal file
Input: signal file
Output: list of signals
'''
record = wfdb.rdsamp('data/training2017/{}'.format(record_name.strip()))
signal = record.adc()[:,0]
return signal
def show_path(path):
""" As a plot """
# Read in the data
record = wf.rdsamp(path)
annotation = wf.rdann(path, 'atr')
data = record.p_signals
cha = data[:, 0]
print 'Channel type:', record.signame[0]
times = np.arange(len(cha), dtype = float)
times /= record.fs
plt.plot(times, cha)
plt.xlabel('Time [s]')
plt.show()
def load_data(filename):
'''
Load signal file
Input: signal file
Output: list of signals
'''
signals = []
records_file = open(filename, 'r')
for record_name in records_file:
record = wfdb.rdsamp('data/training2017/{}'.format(record_name.strip()))
d_signal = record.adc()[:,0]
signals.append(d_signal)
return signals
def show_objective_part2():
""" For the model """
# Choose a record
records = dm.get_records()
path = records[13]
record = wf.rdsamp(path)
ann = wf.rdann(path, 'atr')
chid = 0
print 'File:', path
print 'Channel:', record.signame[chid]
cha = record.p_signals[:, chid]
# These were found manually
sta = 184000
end = sta + 1000
times = np.arange(end-sta, dtype = 'float')
times /= record.fs
# Extract the annotations for that fragment
where = (sta < ann.annsamp) & (ann.annsamp < end)
def make_dataset(records, width, savepath):
""" Inside an array """
# Prepare containers
signals, labels = [], []
# Iterate files
for path in records:
print 'Processing file:', path
record = wf.rdsamp(path)
annotations = wf.rdann(path, 'atr')
# Extract pure signals
data = record.p_signals
# Convert each channel into labeled fragments
signal, label = convert_data(data, annotations, width)
# Cumulate
signals.append(signal)
labels.append(label)
# Convert to one huge numpy.array
signals = np.vstack(signals)
labels = np.vstack(labels)
def show_annotations(path):
""" Exemplary code """
record = wf.rdsamp(path)
annotation = wf.rdann(path, 'atr')
# Get data and annotations for the first 2000 samples
howmany = 2000
channel = record.p_signals[:howmany, 0]
# Extract all of the annotation related infromation
where = annotation.annsamp < howmany
samp = annotation.annsamp[where]
# Convert to numpy.array to get fancy indexing access
types = np.array(annotation.anntype)
types = types[where]
times = np.arange(howmany, dtype = 'float') / record.fs
plt.plot(times, channel)