Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parser.add_option("-f", "--figure",
dest="figure", type='string', default="15x6",
help="window size.")
filt = True
subsample = 3
buf = 12
(options, args) = parser.parse_args()
window = options.window
scale = options.scale
figsize = np.int16(options.figure.split('x'))
logging.debug("looking for an EEG stream...")
streams = resolve_byprop('type', 'EEG', timeout=2)
if len(streams) == 0:
raise(RuntimeError("Cant find EEG stream"))
logging.debug("Start aquiring data")
fig, axes = plt.subplots(1, 1, figsize=figsize, sharex=True)
lslv = LSLViewer(streams[0], fig, axes, window, scale, filter_data=filt)
lslv.start()
plt.show()
lslv.stop()
def update(self):
if not self._inlet:
self.logger.debug(f"Resolving stream with {self._prop} {self._value}")
streams = resolve_byprop(self._prop, self._value, timeout=self._timeout)
if not streams:
return
self.logger.debug("Stream acquired")
self._inlet = StreamInlet(streams[0])
info = self._inlet.info()
self._meta = {
"name": info.name(),
"type": info.type(),
"rate": info.nominal_srate(),
"info": str(info.as_xml()).replace("\n", "").replace("\t", ""),
}
if isinstance(self._channels, list):
self._labels = self._channels
else:
description = info.desc()
channel = description.child("channels").first_child()
def __init__(self, size=(600,350)):
streams = resolve_byprop('name', 'bci', timeout=2.5)
try:
self.inlet = StreamInlet(streams[0])
except IndexError:
raise ValueError('Make sure stream name=bci is opened first.')
self.running = True
self.frequency = 250.0
self.sampleinterval = (1/self.frequency)
self.timewindow = 10
self._bufsize = int(self.timewindow/self.sampleinterval)
self.dataBuffer = collections.deque([0.0] * self._bufsize, self._bufsize)
self.timeBuffer = collections.deque([0.0] * self._bufsize, self._bufsize)
self.x = np.empty(self._bufsize,dtype='float64')
self.y = np.empty(self._bufsize,dtype='float64')
self.app = QtGui.QApplication([])
def view():
print("Looking for an EEG stream...")
streams = resolve_byprop('type', 'EEG', timeout=LSL_SCAN_TIMEOUT)
if len(streams) == 0:
raise(RuntimeError("Can't find EEG stream."))
print("Start acquiring data.")
inlet = StreamInlet(streams[0], max_chunklen=LSL_EEG_CHUNK)
Canvas(inlet)
app.run()
def receiver(self):
""" Receive data from an LSL stream and store it in a circular
buffer.
"""
streams = []
while not streams:
print("Trying to connect to the stream: " + self.lsl_stream_name)
streams = lsl.resolve_byprop(
'name',
self.lsl_stream_name,
timeout=10)
if not streams:
print("\tStream not found, re-trying...")
inlet = lsl.StreamInlet(streams[0], max_buflen=1)
print("\tDone")
i = 0
self.last_time.value = 0 # init the last_time value
while self.run_state.value:
x, t = inlet.pull_sample()
self.last_sample_received.value = time.time()
self.lock_primary.acquire() # LOCK-ON
def __init__(self):
# check device manager for correct COM port.
self.board = bci.OpenBCIBoard(port='COM3', filter_data=True,
daisy=False)
# setup LSL
streams = resolve_byprop('name', self.LSL_STREAM_NAME, timeout=2.5)
try:
self.inlet = StreamInlet(streams[0])
except IndexError:
raise ValueError('Make sure stream name="%s", is opened first.'
% LSL_STREAM_NAME)
self.running = True
self.samples = []
info = StreamInfo(self.LSL_BCI_STREAM_NAME, 'eeg',
self.LSL_BCI_NUM_CHANNELS, self.LSL_BCI_SAMPLE_RATE, 'float32', 'uid2')
self.outlet = StreamOutlet(info)
def record(duration, filename=None, dejitter=False, data_source="EEG"):
chunk_length = LSL_EEG_CHUNK
if data_source == "PPG":
chunk_length = LSL_PPG_CHUNK
if data_source == "ACC":
chunk_length = LSL_ACC_CHUNK
if data_source == "GYRO":
chunk_length = LSL_GYRO_CHUNK
if not filename:
filename = os.path.join(os.getcwd(
), "%s_recording_%s.csv" % (data_source, strftime('%Y-%m-%d-%H.%M.%S', gmtime())))
print("Looking for a %s stream..." % (data_source))
streams = resolve_byprop('type', data_source, timeout=LSL_SCAN_TIMEOUT)
if len(streams) == 0:
print("Can't find %s stream." % (data_source))
return
print("Started acquiring data.")
inlet = StreamInlet(streams[0], max_chunklen=chunk_length)
# eeg_time_correction = inlet.time_correction()
print("Looking for a Markers stream...")
marker_streams = resolve_byprop(
'name', 'Markers', timeout=LSL_SCAN_TIMEOUT)
if marker_streams:
inlet_marker = StreamInlet(marker_streams[0])
else:
filename = os.path.join(os.getcwd(
), "%s_recording_%s.csv" % (data_source, strftime('%Y-%m-%d-%H.%M.%S', gmtime())))
print("Looking for a %s stream..." % (data_source))
streams = resolve_byprop('type', data_source, timeout=LSL_SCAN_TIMEOUT)
if len(streams) == 0:
print("Can't find %s stream." % (data_source))
return
print("Started acquiring data.")
inlet = StreamInlet(streams[0], max_chunklen=chunk_length)
# eeg_time_correction = inlet.time_correction()
print("Looking for a Markers stream...")
marker_streams = resolve_byprop(
'name', 'Markers', timeout=LSL_SCAN_TIMEOUT)
if marker_streams:
inlet_marker = StreamInlet(marker_streams[0])
else:
inlet_marker = False
print("Can't find Markers stream.")
info = inlet.info()
description = info.desc()
Nchan = info.channel_count()
ch = description.child('channels').first_child()
ch_names = [ch.child_value('label')]
for i in range(1, Nchan):
Search and connect to an LSL server
Params
------
server_name:
Name of the server to search
Returns
-------
inlet:
LSL client object
"""
while True:
logger.info('Searching for LSL server %s ...' % server_name)
streamInfos = pylsl.resolve_byprop("name", server_name, timeout=1)
if not streamInfos:
continue
for sinfo in streamInfos:
logger.info('Found %s' % sinfo.name())
if len(streamInfos) == 0:
logger.info('No desired LSL server found. Keep searching...')
time.sleep(1.0)
else:
sinfo = streamInfos[0]
break
return pylsl.StreamInlet(sinfo)