Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if self.doRealTimeStreaming:
self.path_and_file_name_event = self.path_and_file_name_event
print "Will export Events CSV to:", self.path_and_file_name_event + '.csv'
# Open in append mode
self.fevent = open(self.path_and_file_name_event + '.csv', 'a', buffering=100000)
if self.exportEDFAnnotations:
print "Will export Events from EDF to CSV in:", self.path_and_file_name_event + '.edf' + '.csv'
self.feventEDF = open(self.path_and_file_name_event + '.edf' + '.csv', 'a', buffering=100000)
self.feventEDF.write('onset' + self.delim + 'annotation' + '\n')
# if self.isActivated:
# self.fevent.write('#Streaming restarted at ' + self.time_stamp + '\n')
# self.fevent.write('#Streaming started at ' + self.time_stamp + '\n')
if self.writeEDF:
EDF_format_extention = ".edf"
EDF_format_filetype = pyedflib.FILETYPE_EDFPLUS
temp_filterStringFileIndicator = "_prefiltered"
temp_filterStringHeader = 'HP ' + str(self.prefilterEDF_hp) + ' Hz'
if self.prefilterEDF_hp is None:
EDF_format_extention = ".bdf"
EDF_format_filetype = pyedflib.FILETYPE_BDFPLUS
temp_filterStringFileIndicator = "_unfiltered"
temp_filterStringHeader = 'none'
self.edfWriter = pyedflib.EdfWriter(self.path_and_file_name + "_missing_samples_corrected" + temp_filterStringFileIndicator + EDF_format_extention, self.nChannels+3, file_type=EDF_format_filetype)
"""
Only when the number of annotations you want to write is more than the number of seconds of the duration of the recording, you can use this function to increase the storage space for annotations */
/* Minimum is 1, maximum is 64 */
"""
if self.writeEDFAnnotations:
self.edfWriter.set_number_of_annotation_signals(self.edfAnnotationChannels) #7*60 = 420 annotations per minute on average
def write_edf(fname, raw):
"""Export raw to EDF/BDF file (requires pyEDFlib)."""
import pyedflib
ext = "".join(Path(fname).suffixes)
if ext == ".edf":
filetype = pyedflib.FILETYPE_EDFPLUS
dmin, dmax = -32768, 32767
elif ext == ".bdf":
filetype = pyedflib.FILETYPE_BDFPLUS
dmin, dmax = -8388608, 8388607
data = raw.get_data() * 1e6 # convert to microvolts
fs = raw.info["sfreq"]
nchan = raw.info["nchan"]
ch_names = raw.info["ch_names"]
if raw.info["meas_date"] is not None:
meas_date = raw.info["meas_date"]
else:
meas_date = None
prefilter = (f"{raw.info['highpass']}Hz - "
f"{raw.info['lowpass']}")
pmin, pmax = data.min(axis=1), data.max(axis=1)
f = pyedflib.EdfWriter(fname, nchan, filetype)
# 5 pulse 3 100 uV 1 Hz 217 Hz
# 6 noise 100 uV - Hz 200 Hz
# 7 sine 1 Hz 100 uV 1 Hz 200 Hz
# 8 sine 8 Hz 100 uV 8 Hz 200 Hz
# 9 sine 8.1777 Hz 100 uV 8.25 Hz 200 Hz
# 10 sine 8.5 Hz 100 uV 8.5Hz 200 Hz
# 11 sine 15 Hz 100 uV 15 Hz 200 Hz
# 12 sine 17 Hz 100 uV 17 Hz 200 Hz
# 13 sine 50 Hz 100 uV 50 Hz 200 Hz
if __name__ == '__main__':
test_data_file = os.path.join('.', 'test_generator2.edf')
file_duration = 600
f = pyedflib.EdfWriter(test_data_file, 13,
file_type=pyedflib.FILETYPE_EDFPLUS)
channel_info = []
data_list = []
ch_dict = {'label': 'squarewave', 'dimension': 'uV', 'sample_rate': 200, 'physical_max': 100, 'physical_min': -100, 'digital_max': 32767, 'digital_min': -32768, 'transducer': '', 'prefilter':''}
channel_info.append(ch_dict)
time = np.linspace(0, file_duration, file_duration*200)
xtemp = np.sin(2*np.pi*0.1*time)
x1 = xtemp.copy()
x1[np.where(xtemp > 0)[0]] = 100
x1[np.where(xtemp < 0)[0]] = -100
data_list.append(x1)
ch_dict = {'label': 'ramp', 'dimension': 'uV', 'sample_rate': 200, 'physical_max': 100, 'physical_min': -100, 'digital_max': 32767, 'digital_min': -32768, 'transducer': '', 'prefilter':''}
channel_info.append(ch_dict)
time = np.linspace(0, file_duration, file_duration*200)
x2 = signal.sawtooth(2 * np.pi * 1 * time)