Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Discover all LSL streams available on the local network.
Returns:
dict[str, dict[str, pylsl.StreamInfo]]: Streams mapped to source/type.
Keys are source IDs; values are dictionaries for which the keys
are stream types and the values are stream.
Example:
When EEG and accelerometer streams are found for a single Muse headset:
>>> get_lsl_streams()
{'Muse-00:00:00:00:00:00': {'EEG': ,
'accelerometer': }}
"""
streams = [(stream.source_id(), stream.type(), stream)
for stream in lsl.resolve_streams(wait_time=2)]
streams_dict = streams_dict_from_streams(streams)
return streams_dict
def connect(self, find_any=True):
"""
Run in child process
"""
server_found = False
amps = []
channels = 0
while server_found == False:
if self.amp_name is None and self.amp_serial is None:
logger.info("Looking for a streaming server...")
else:
logger.info("Looking for %s (Serial %s) ..." % (self.amp_name, self.amp_serial))
streamInfos = pylsl.resolve_streams()
if len(streamInfos) > 0:
# For now, only 1 amp is supported by a single StreamReceiver object.
for si in streamInfos:
# is_slave= ('true'==pylsl.StreamInlet(si).info().desc().child('amplifier').child('settings').child('is_slave').first_child().value() )
inlet = pylsl.StreamInlet(si)
# LSL XML parser has a bug which crashes so do not use for now
#amp_serial = inlet.info().desc().child('acquisition').child_value('serial_number')
amp_serial = 'N/A'
amp_name = si.name()
# connect to a specific amp only?
if self.amp_serial is not None and self.amp_serial != amp_serial:
continue
# connect to a specific amp only?
if self.amp_name is not None and self.amp_name != amp_name:
def is_available():
"""Check if an lsl stream is available on the network.
Returns
-------
ok : Boolean
True if there is a lsl stream, False otherwise
"""
# Return True only if at least one lsl stream can be found on
# the network
if pylsl.resolve_streams():
return True
return False
timeout = patch.getfloat('lsl', 'timeout', default=30)
lsl_name = patch.getstring('lsl', 'name')
lsl_type = patch.getstring('lsl', 'type')
lsl_format = patch.getstring('lsl', 'format')
output_prefix = patch.getstring('output', 'prefix')
monitor.info("looking for an LSL stream...")
start = time.time()
selected = []
while len(selected) < 1:
if (time.time() - start) > timeout:
monitor.error("Error: timeout while waiting for LSL stream")
raise SystemExit
# find the desired stream on the lab network
streams = lsl.resolve_streams()
for stream in streams:
inlet = lsl.StreamInlet(stream)
name = inlet.info().name()
type = inlet.info().type()
source_id = inlet.info().source_id()
# determine whether this stream should be further processed
match = True
if len(lsl_name):
match = match and lsl_name == name
if len(lsl_type):
match = match and lsl_type == type
if match:
# select this stream for further processing
selected.append(stream)
monitor.info('-------- STREAM(*) ------')
else:
def enumerate():
streams = resolve_streams()
for stream in streams:
print(
f"name: {stream.name()}, type: {stream.type()}, source_id: {stream.source_id()}"
)
ft_output = FieldTrip.Client()
ft_output.connect(ft_host, ft_port)
monitor.info("Connected to output FieldTrip buffer")
except:
raise RuntimeError("cannot connect to output FieldTrip buffer")
monitor.success("looking for an LSL stream...")
start = time.time()
selected = []
while len(selected) < 1:
if (time.time() - start) > timeout:
monitor.error("Error: timeout while waiting for LSL stream")
raise SystemExit
# find the desired stream on the lab network
streams = lsl.resolve_streams()
for stream in streams:
inlet = lsl.StreamInlet(stream)
name = inlet.info().name()
type = inlet.info().type()
source_id = inlet.info().source_id()
# determine whether this stream should be further processed
match = True
if len(lsl_name):
match = match and lsl_name == name
if len(lsl_type):
match = match and lsl_type == type
if match:
# select this stream for further processing
selected.append(stream)
monitor.success('-------- STREAM(*) ------')
else: