Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# lsl defined
self.max_samples = 1024
# open EEG stream
logger.debug('Opening EEG stream...')
streams = pylsl.resolve_stream('type', 'EEG')
if len(streams) > 1:
logger.warning('Number of EEG streams is > 0, picking the first one.')
self.lsl_inlet = pylsl.StreamInlet(streams[0])
# open marker stream
logger.debug('Opening Marker stream...')
# TODO: should add a timeout here in case there is no marker
# stream
streams = pylsl.resolve_stream('type', 'Markers')
if len(streams) > 1:
logger.warning('Number of Marker streams is > 0, picking the first one.')
self.lsl_marker_inlet = pylsl.StreamInlet(streams[0])
info = self.lsl_inlet.info()
self.n_channels = info.channel_count()
self.channels = ['Ch %i' % i for i in range(self.n_channels)]
self.fs = info.nominal_srate()
logger.debug('Initializing time correction...')
self.lsl_marker_inlet.time_correction()
self.lsl_inlet.time_correction()
logger.debug('Configuration done.')
def __init__(self, size=(600,350)):
streams = resolve_byprop('name', 'bci', timeout=2.5)
try:
self.inlet = StreamInlet(streams[0])
except IndexError:
raise ValueError('Make sure stream name=bci is opened first.')
self.running = True
self.ProcessedSig = []
self.SecondTimes = []
self.count = -1
self.sampleCount = self.count
self.maximum = 0
self.minimum = 0
plt.ion()
plt.hold(False)
self.lineHandle = plt.plot(self.SecondTimes, self.ProcessedSig)
plt.title("Live Stream EEG Data")
def __init__(self, stream, fig, axes, window, scale, filter_data=True, dejitter=True):
"""Init"""
self.stream = stream
self.window = window
self.scale = scale
self.dejitter = dejitter
self.inlet = StreamInlet(stream, max_chunklen=buf)
self.filt = filter_data
info = self.inlet.info()
description = info.desc()
self.sfreq = info.nominal_srate()
self.n_samples = int(self.sfreq * self.window)
self.n_chan = info.channel_count()
ch = description.child('channels').first_child()
ch_names = [ch.child_value('label')]
for i in range(self.n_chan):
ch = ch.next_sibling()
ch_names.append(ch.child_value('label'))
if len(lsl_name):
match = match and lsl_name == name
if len(lsl_type):
match = match and lsl_type == type
if match:
# select this stream for further processing
selected.append(stream)
monitor.info('-------- STREAM(*) ------')
else:
monitor.info('-------- STREAM ---------')
monitor.info("name = " + name)
monitor.info("type = " + type)
monitor.info('-------------------------')
# create a new inlet from the first (and hopefully only) selected stream
inlet = lsl.StreamInlet(selected[0])
# give some feedback
lsl_name = inlet.info().name()
lsl_type = inlet.info().type()
lsl_id = inlet.info().source_id()
monitor.success('connected to LSL stream %s (type = %s, id = %s)' % (lsl_name, lsl_type, lsl_id))
# there should not be any local variables in this function, they should all be global
if len(locals()):
print('LOCALS: ' + ', '.join(locals().keys()))
def __init__(self, size=(600,350)):
streams = resolve_byprop('name', 'bci', timeout=2.5)
try:
self.inlet = StreamInlet(streams[0])
except IndexError:
raise ValueError('Make sure stream name=bci is opened first.')
self.running = True
self.frequency = 250.0
self.sampleinterval = (1/self.frequency)
self.timewindow = 10
self._bufsize = int(self.timewindow/self.sampleinterval)
self.dataBuffer = collections.deque([0.0] * self._bufsize, self._bufsize)
self.timeBuffer = collections.deque([0.0] * self._bufsize, self._bufsize)
self.x = np.empty(self._bufsize,dtype='float64')
self.y = np.empty(self._bufsize,dtype='float64')
self.app = QtGui.QApplication([])
self.plt = pg.plot(title='EEG data from OpenBCI')
self.plt.resize(*size)
# 0 = left ear, 1 = left forehead, 2 = right forehead, 3 = right ear
INDEX_CHANNEL = [0]
if __name__ == "__main__":
""" 1. CONNECT TO EEG STREAM """
# Search for active LSL streams
print('Looking for an EEG stream...')
streams = resolve_byprop('type', 'EEG', timeout=2)
if len(streams) == 0:
raise RuntimeError('Can\'t find EEG stream.')
# Set active EEG stream to inlet and apply time correction
print("Start acquiring data")
inlet = StreamInlet(streams[0], max_chunklen=12)
eeg_time_correction = inlet.time_correction()
# Get the stream info and description
info = inlet.info()
description = info.desc()
# Get the sampling frequency
# This is an important value that represents how many EEG data points are
# collected in a second. This influences our frequency band calculation.
# for the Muse 2016, this should always be 256
fs = int(info.nominal_srate())
""" 2. INITIALIZE BUFFERS """
# Initialize raw EEG data buffer
eeg_buffer = np.zeros((int(fs * BUFFER_LENGTH), 1))
"""Example program to show how to read a multi-channel time series from LSL."""
from pylsl import StreamInlet, resolve_stream
# first resolve an EEG stream on the lab network
print("looking for an EEG stream...")
streams = resolve_stream('type', 'EEG')
# create a new inlet to read from the stream
inlet = StreamInlet(streams[0])
while True:
# get a new sample (you can also omit the timestamp part if you're not
# interested in it)
sample, timestamp = inlet.pull_sample()
print(timestamp, sample)
if self._lsl_tr_channel is None:
logger.warning('Trigger channel not fonud. Adding an empty channel 0.')
else:
if self._lsl_tr_channel != 0:
logger.info_yellow('Trigger channel found at index %d. Moving to index 0.' % self._lsl_tr_channel)
self._lsl_eeg_channels.pop(self._lsl_tr_channel)
self._lsl_eeg_channels = np.array(self._lsl_eeg_channels)
self.tr_channel = 0 # trigger channel is always set to 0.
self.eeg_channels = np.arange(1, channels) # signal channels start from 1.
# create new inlets to read from the stream
inlets_master = []
inlets_slaves = []
for amp in amps:
# data type of the 2nd argument (max_buflen) is int according to LSL C++ specification!
inlet = pylsl.StreamInlet(amp, max_buflen=self.stream_bufsec)
inlets_master.append(inlet)
self.buffers.append([])
self.timestamps.append([])
inlets = inlets_master + inlets_slaves
sample_rate = amps[0].nominal_srate()
logger.info('Channels: %d' % channels)
logger.info('LSL Protocol version: %s' % amps[0].version())
logger.info('Source sampling rate: %.1f' % sample_rate)
logger.info('Unit multiplier: %.1f' % self.multiplier)
#self.winsize = int(self.winsec * sample_rate)
#self.bufsize = int(self.bufsec * sample_rate)
self.winsize = int(round(self.winsec * sample_rate))
self.bufsize = int(round(self.bufsec * sample_rate))
self.stream_bufsize = int(round(self.stream_bufsec * sample_rate))
Run in child process
"""
server_found = False
amps = []
channels = 0
while server_found == False:
if self.amp_name is None and self.amp_serial is None:
logger.info("Looking for a streaming server...")
else:
logger.info("Looking for %s (Serial %s) ..." % (self.amp_name, self.amp_serial))
streamInfos = pylsl.resolve_streams()
if len(streamInfos) > 0:
# For now, only 1 amp is supported by a single StreamReceiver object.
for si in streamInfos:
# is_slave= ('true'==pylsl.StreamInlet(si).info().desc().child('amplifier').child('settings').child('is_slave').first_child().value() )
inlet = pylsl.StreamInlet(si)
# LSL XML parser has a bug which crashes so do not use for now
#amp_serial = inlet.info().desc().child('acquisition').child_value('serial_number')
amp_serial = 'N/A'
amp_name = si.name()
# connect to a specific amp only?
if self.amp_serial is not None and self.amp_serial != amp_serial:
continue
# connect to a specific amp only?
if self.amp_name is not None and self.amp_name != amp_name:
continue
# EEG streaming server only?
if self.eeg_only and si.type() != 'EEG':
continue