Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
model.predict( np.zeros((32,16,60)).reshape(reshape) )
ACTION = 'left' # THIS IS THE ACTION YOU'RE THINKING
FFT_MAX_HZ = 60
HM_SECONDS = 10 # this is approximate. Not 100%. do not depend on this.
TOTAL_ITERS = HM_SECONDS*25 # ~25 iters/sec
BOX_MOVE = "model" # random or model
last_print = time.time()
fps_counter = deque(maxlen=150)
# first resolve an EEG stream on the lab network
print("looking for an EEG stream...")
streams = resolve_stream('type', 'EEG')
# create a new inlet to read from the stream
inlet = StreamInlet(streams[0])
WIDTH = 800
HEIGHT = 800
SQ_SIZE = 50
MOVE_SPEED = 1
square = {'x1': int(int(WIDTH)/2-int(SQ_SIZE/2)),
'x2': int(int(WIDTH)/2+int(SQ_SIZE/2)),
'y1': int(int(HEIGHT)/2-int(SQ_SIZE/2)),
'y2': int(int(HEIGHT)/2+int(SQ_SIZE/2))}
box = np.ones((square['y2']-square['y1'], square['x2']-square['x1'], 3)) * np.random.uniform(size=(3,))
horizontal_line = np.ones((HEIGHT, 10, 3)) * np.random.uniform(size=(3,))
"""Configure the lsl device.
This method looks for open lsl streams and picks the first `EEG`
and `Markers` streams and opens lsl inlets for them.
Note that lsl amplifiers cannot be configured via lsl, as the
protocol was not designed for that. You can only connect (i.e.
subscribe) to devices that connected (publishing) via the lsl
protocol.
"""
# lsl defined
self.max_samples = 1024
# open EEG stream
logger.debug('Opening EEG stream...')
streams = pylsl.resolve_stream('type', 'EEG')
if len(streams) > 1:
logger.warning('Number of EEG streams is > 0, picking the first one.')
self.lsl_inlet = pylsl.StreamInlet(streams[0])
# open marker stream
logger.debug('Opening Marker stream...')
# TODO: should add a timeout here in case there is no marker
# stream
streams = pylsl.resolve_stream('type', 'Markers')
if len(streams) > 1:
logger.warning('Number of Marker streams is > 0, picking the first one.')
self.lsl_marker_inlet = pylsl.StreamInlet(streams[0])
info = self.lsl_inlet.info()
self.n_channels = info.channel_count()
self.channels = ['Ch %i' % i for i in range(self.n_channels)]
self.fs = info.nominal_srate()
logger.debug('Initializing time correction...')
def __init__(self, stream_type: str = 'EEG'):
super(LslDataSource, self).__init__()
print('Waiting for LSL EEG data stream...')
self.stream_type = stream_type
streams = pylsl.resolve_stream('type', self.stream_type)
inlet = pylsl.StreamInlet(streams[0])
info = inlet.info()
fs = float(info.nominal_srate())
self.sample_rate = fs
print(f'Sample rate: {fs}')
name = info.name()
channel_names = []
ch = info.desc().child("channels").child("channel")
for k in range(info.channel_count()):
channel_names.append(ch.child_value("label"))
ch = ch.next_sibling()
self.device_info = DeviceInfo(fs=fs, channels=channel_names, name=name)
self.inlet = inlet
info.desc().append_child_value("manufacturer", "SCCN")
cap = info.desc().append_child("cap")
cap.append_child_value("name", "EasyCap")
cap.append_child_value("size", "54")
cap.append_child_value("labelscheme", "10-20")
# create outlet for the stream
outlet = StreamOutlet(info)
# (...normally here one might start sending data into the outlet...)
# === the following could run on another computer ===
# first we resolve a stream whose name is MetaTester (note that there are
# other ways to query a stream, too - for instance by content-type)
results = resolve_stream("name", "MetaTester")
# open an inlet so we can read the stream's data (and meta-data)
inlet = StreamInlet(results[0])
# get the full stream info (including custom meta-data) and dissect it
info = inlet.info()
print("The stream's XML meta-data is: ")
print(info.as_xml())
print("The manufacturer is: %s" % info.desc().child_value("manufacturer"))
print("Cap circumference is: %s" % info.desc().child("cap").child_value("size"))
print("The channel labels are as follows:")
ch = info.desc().child("channels").child("channel")
for k in range(info.channel_count()):
print(" " + ch.child_value("label"))
ch = ch.next_sibling()
def connect(self):
"""Connect to the data source."""
# Streams can be queried by name, type (xdf file format spec), and
# other metadata.
# NOTE: According to the documentation this is a blocking call that can
# only be performed on the main thread in Linux systems. So far testing
# seems fine when done in a separate multiprocessing.Process.
eeg_streams = pylsl.resolve_stream('type', 'EEG')
marker_streams = pylsl.resolve_stream('type', 'Markers')
assert eeg_streams, "One or more EEG streams must be present"
assert marker_streams, "One or more Marker streams must be present"
self._inlet = pylsl.StreamInlet(eeg_streams[0])
self._marker_inlets = [pylsl.StreamInlet(inlet)
for inlet in marker_streams]
# initialize the current_markers for each marker stream.
for inlet in self._marker_inlets:
self.current_markers[inlet_name(inlet)] = Marker.empty()
protocol.
"""
# lsl defined
self.max_samples = 1024
# open EEG stream
logger.debug('Opening EEG stream...')
streams = pylsl.resolve_stream('type', 'EEG')
if len(streams) > 1:
logger.warning('Number of EEG streams is > 0, picking the first one.')
self.lsl_inlet = pylsl.StreamInlet(streams[0])
# open marker stream
logger.debug('Opening Marker stream...')
# TODO: should add a timeout here in case there is no marker
# stream
streams = pylsl.resolve_stream('type', 'Markers')
if len(streams) > 1:
logger.warning('Number of Marker streams is > 0, picking the first one.')
self.lsl_marker_inlet = pylsl.StreamInlet(streams[0])
info = self.lsl_inlet.info()
self.n_channels = info.channel_count()
self.channels = ['Ch %i' % i for i in range(self.n_channels)]
self.fs = info.nominal_srate()
logger.debug('Initializing time correction...')
self.lsl_marker_inlet.time_correction()
self.lsl_inlet.time_correction()
logger.debug('Configuration done.')
def connect(self):
"""Connect to the data source."""
# Streams can be queried by name, type (xdf file format spec), and
# other metadata.
# NOTE: According to the documentation this is a blocking call that can
# only be performed on the main thread in Linux systems. So far testing
# seems fine when done in a separate multiprocessing.Process.
eeg_streams = pylsl.resolve_stream('type', 'EEG')
marker_streams = pylsl.resolve_stream('type', 'Markers')
assert eeg_streams, "One or more EEG streams must be present"
assert marker_streams, "One or more Marker streams must be present"
self._inlet = pylsl.StreamInlet(eeg_streams[0])
self._marker_inlets = [pylsl.StreamInlet(inlet)
for inlet in marker_streams]
# initialize the current_markers for each marker stream.
for inlet in self._marker_inlets:
self.current_markers[inlet_name(inlet)] = Marker.empty()
"""Example program to demonstrate how to read a multi-channel time-series
from LSL in a chunk-by-chunk manner (which is more efficient)."""
from pylsl import StreamInlet, resolve_stream
# first resolve an EEG stream on the lab network
print("looking for an EEG stream...")
streams = resolve_stream('type', 'EEG')
# create a new inlet to read from the stream
inlet = StreamInlet(streams[0])
while True:
# get a new sample (you can also omit the timestamp part if you're not
# interested in it)
chunk, timestamps = inlet.pull_chunk()
if timestamps:
print(timestamps, chunk)
"""Example program to demonstrate how to read string-valued markers from LSL."""
from pylsl import StreamInlet, resolve_stream
# first resolve a marker stream on the lab network
print("looking for a marker stream...")
streams = resolve_stream('type', 'Markers')
# create a new inlet to read from the stream
inlet = StreamInlet(streams[0])
while True:
# get a new sample (you can also omit the timestamp part if you're not
# interested in it)
sample, timestamp = inlet.pull_sample()
print("got %s at time %s" % (sample[0], timestamp))