Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
board.config_board ('/2') # enable analog mode only for Cyton Based Boards!
time.sleep (5)
data = board.get_board_data ()
board.stop_stream ()
board.release_session ()
"""
data[BoardShim.get_other_channels(args.board_id)[0]] contains cyton end byte
data[BoardShim.get_other_channels(args.board_id)[1....]] contains unprocessed bytes
if end byte is 0xC0 there are accel data in data[BoardShim.get_accel_channels(args.board_id)[....]] else there are zeros
if end byte is 0xC1 there are analog data in data[BoardShim.get_analog_channels(args.board_id)[....]] else there are zeros
"""
print (data[BoardShim.get_other_channels(args.board_id)[0]][0:5]) # should be standard end byte 0xC0
print (data[BoardShim.get_other_channels(args.board_id)[0]][-5:]) # should be analog and byte 0xC1
DataFilter.write_file (data, 'cyton_data.csv', 'w')
board.start_stream ()
BoardShim.log_message (LogLevels.LEVEL_INFO.value, 'start sleeping in the main thread')
time.sleep (10)
data = board.get_board_data ()
board.stop_stream ()
board.release_session ()
eeg_channels = BoardShim.get_eeg_channels (board_id)
# demo for transforms
for count, channel in enumerate (eeg_channels):
print ('Original data for channel %d:' % channel)
print (data[channel])
# demo for wavelet transforms
# wavelet_coeffs format is[A(J) D(J) D(J-1) ..... D(1)] where J is decomposition level, A - app coeffs, D - detailed coeffs
# lengths array stores lengths for each block
wavelet_coeffs, lengths = DataFilter.perform_wavelet_transform (data[channel], 'db5', 3)
app_coefs = wavelet_coeffs[0: lengths[0]]
detailed_coeffs_first_block = wavelet_coeffs[lengths[0] : lengths[1]]
# you can do smth with wavelet coeffs here, for example denoising works via thresholds
# for wavelets coefficients
restored_data = DataFilter.perform_inverse_wavelet_transform ((wavelet_coeffs, lengths), data[channel].shape[0], 'db5', 3)
print ('Restored data after wavelet transform for channel %d:' % channel)
print (restored_data)
# demo for fft, len of data must be a power of 2
fft_data = DataFilter.perform_fft (data[channel][:128])
# len of fft_data is N / 2 + 1
restored_fft_data = DataFilter.perform_ifft (fft_data)
print ('Restored data after fft for channel %d:' % channel)
print (restored_fft_data)
print ('Original data for channel %d:' % channel)
print (data[channel])
# demo for wavelet transforms
# wavelet_coeffs format is[A(J) D(J) D(J-1) ..... D(1)] where J is decomposition level, A - app coeffs, D - detailed coeffs
# lengths array stores lengths for each block
wavelet_coeffs, lengths = DataFilter.perform_wavelet_transform (data[channel], 'db5', 3)
app_coefs = wavelet_coeffs[0: lengths[0]]
detailed_coeffs_first_block = wavelet_coeffs[lengths[0] : lengths[1]]
# you can do smth with wavelet coeffs here, for example denoising works via thresholds
# for wavelets coefficients
restored_data = DataFilter.perform_inverse_wavelet_transform ((wavelet_coeffs, lengths), data[channel].shape[0], 'db5', 3)
print ('Restored data after wavelet transform for channel %d:' % channel)
print (restored_data)
# demo for fft, len of data must be a power of 2
fft_data = DataFilter.perform_fft (data[channel][:128])
# len of fft_data is N / 2 + 1
restored_fft_data = DataFilter.perform_ifft (fft_data)
print ('Restored data after fft for channel %d:' % channel)
print (restored_fft_data)
# data = board.get_current_board_data (256) # get latest 256 packages or less, doesnt remove them from internal buffer
data = board.get_board_data () # get all data and remove it from internal buffer
board.stop_stream ()
board.release_session ()
# demo how to convert it to pandas DF and plot data
eeg_channels = BoardShim.get_eeg_channels (args.board_id)
df = pd.DataFrame (np.transpose (data))
print ('Data From the Board')
print (df.head ())
plt.figure ()
df[eeg_channels].plot (subplots = True)
plt.savefig ('before_processing.png')
# demo for data serialization
DataFilter.write_file (data, 'test.csv', 'w')
restored_data = DataFilter.read_file ('test.csv')
restored_df = pd.DataFrame (np.transpose (restored_data))
print ('Data From the File')
print (restored_df.head ())
# demo how to perform signal processing
for count, channel in enumerate (eeg_channels):
if count == 0:
DataFilter.perform_bandpass (data[channel], BoardShim.get_sampling_rate (args.board_id), 15.0, 6.0, 4, FilterTypes.BESSEL.value, 0)
elif count == 1:
DataFilter.perform_bandstop (data[channel], BoardShim.get_sampling_rate (args.board_id), 5.0, 1.0, 3, FilterTypes.BUTTERWORTH.value, 0)
elif count == 2:
DataFilter.perform_lowpass (data[channel], BoardShim.get_sampling_rate (args.board_id), 9.0, 5, FilterTypes.CHEBYSHEV_TYPE_1.value, 1)
elif count == 3:
DataFilter.perform_highpass (data[channel], BoardShim.get_sampling_rate (args.board_id), 3.0, 4, FilterTypes.BUTTERWORTH.value, 0)
# data = board.get_current_board_data (256) # get latest 256 packages or less, doesnt remove them from internal buffer
data = board.get_board_data () # get all data and remove it from internal buffer
board.stop_stream ()
board.release_session ()
# demo how to convert it to pandas DF and plot data
eeg_channels = BoardShim.get_eeg_channels (args.board_id)
df = pd.DataFrame (np.transpose (data))
print ('Data From the Board')
print (df.head (10))
plt.figure ()
df[eeg_channels].plot (subplots = True)
plt.savefig ('before_processing.png')
# demo for data serialization
DataFilter.write_file (data, 'test.csv', 'w')
restored_data = DataFilter.read_file ('test.csv')
restored_df = pd.DataFrame (np.transpose (restored_data))
print ('Data From the File')
print (restored_df.head (10))
# demo how to perform signal processing
for count, channel in enumerate (eeg_channels):
# filters work in-place
if count == 0:
DataFilter.perform_bandpass (data[channel], BoardShim.get_sampling_rate (args.board_id), 15.0, 6.0, 4, FilterTypes.BESSEL.value, 0)
elif count == 1:
DataFilter.perform_bandstop (data[channel], BoardShim.get_sampling_rate (args.board_id), 5.0, 1.0, 3, FilterTypes.BUTTERWORTH.value, 0)
elif count == 2:
DataFilter.perform_lowpass (data[channel], BoardShim.get_sampling_rate (args.board_id), 9.0, 5, FilterTypes.CHEBYSHEV_TYPE_1.value, 1)
elif count == 3:
DataFilter.perform_highpass (data[channel], BoardShim.get_sampling_rate (args.board_id), 3.0, 4, FilterTypes.BUTTERWORTH.value, 0)