Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import pylsl
import bci.open_bci_v3 as bci
#from random import random as rand
from pylsl import StreamInfo, StreamOutlet
from psychopy import prefs
prefs.general['audioLib'] = ['pygame']
from psychopy import visual, core, sound
import esys_cfg
NUM_CHANNELS = 8
SAMP_RATE = 100
info = StreamInfo('OpenBCI', 'EEG', NUM_CHANNELS, SAMP_RATE, 'float32', 'myuid34234')
outlet = StreamOutlet(info)
#funtion call to start displaying images
#def displayStimuli
# for file in os.listdir('directory'):
# for i in range(0,len(images)):
# def display(files, .....):
# ex: file_name = ['/dir/dir2/img.png']
window = visual.Window([512, 512])
cfg = esys_cfg.create_config('../stimulus-config/test.yml')
print(cfg.trial_order)
#trial_order = ['one', 'two', 'one']
def _create_notify_lsl_outlet(self):
"""Creates notification outlet"""
notification_info = lsl.StreamInfo(
name='Notifications',
type='Pupil Capture',
channel_count=2,
channel_format=lsl.cf_string,
source_id='Notifications @ %s' % self.g_pool.ipc_sub_url)
self._append_channel_info(notification_info, ("subject",
"Python Representation"))
self._append_acquisition_info(notification_info)
return lsl.StreamOutlet(notification_info)
def __init__(self, params, generator, include_meta=True,
add_markers=False, name='TestStream'):
super(LslDataServer, self).__init__()
channels = params['channels']
self.sample_hz = int(params['hz'])
assert channels, "Channels must be provided as a parameter"
stream_name = params.get('name', name)
self.generator = generator
log.debug("Starting server with params: %s", str(params))
info = StreamInfo(stream_name,
"EEG", len(channels),
self.sample_hz,
'float32',
"uid12345")
if include_meta:
meta_channels = info.desc().append_child('channels')
for channel in channels:
meta_channels.append_child('channel') \
.append_child_value('label', channel) \
.append_child_value('unit', 'microvolts') \
.append_child_value('type', 'EEG')
self.outlet = StreamOutlet(info)
self.add_markers = add_markers
except ValueError:
event_ch = None
if raw is not None:
print('Successfully loaded %s\n' % fif_file)
print('Server name: %s' % server_name)
print('Sampling frequency %.1f Hz' % sfreq)
print('Number of channels : %d' % n_channels)
print('Chunk size : %d' % chunk_size)
for i, ch in enumerate(raw.ch_names):
print(i, ch)
print('Trigger channel : %s' % event_ch)
else:
raise RuntimeError('Error while loading %s' % fif_file)
# set server information
sinfo = pylsl.StreamInfo(server_name, channel_count=n_channels, channel_format='float32',\
nominal_srate=sfreq, type='EEG', source_id=server_name)
desc = sinfo.desc()
channel_desc = desc.append_child("channels")
for ch in raw.ch_names:
channel_desc.append_child('channel').append_child_value('label', str(ch))\
.append_child_value('type','EEG').append_child_value('unit','microvolts')
desc.append_child('amplifier').append_child('settings').append_child_value('is_slave', 'false')
desc.append_child('acquisition').append_child_value('manufacturer', 'PyCNBI').append_child_value('serial_number', 'N/A')
outlet = pylsl.StreamOutlet(sinfo, chunk_size=chunk_size)
input('Press Enter to start streaming.')
print('Streaming started')
idx_chunk = 0
t_chunk = chunk_size / sfreq
finished = False
from cyton import OpenBCICyton
from pylsl import StreamInfo, StreamOutlet
import numpy as np
SCALE_FACTOR_EEG = (4500000)/24/(2**23-1) #mV/count
SCALE_FACTOR_AUX = 0.002 / (2**4)
print("Creating LSL stream for EEG. \nName: OpenBCIEEG\nID: OpenBCItestEEG\n")
info_eeg = StreamInfo('OpenBCIEEG', 'EEG', 8, 250, 'float32', 'OpenBCItestEEG')
print("Creating LSL stream for AUX. \nName: OpenBCIAUX\nID: OpenBCItestEEG\n")
info_aux = StreamInfo('OpenBCIAUX', 'AUX', 3, 250, 'float32', 'OpenBCItestAUX')
outlet_eeg = StreamOutlet(info_eeg)
outlet_aux = StreamOutlet(info_aux)
def lsl_streamers(sample):
outlet_eeg.push_sample(np.array(sample.channels_data)*SCALE_FACTOR_EEG)
outlet_aux.push_sample(np.array(sample.aux_data)*SCALE_FACTOR_AUX)
board = OpenBCICyton(port='COM5', daisy=False)
board.start_stream(lsl_streamers)
from psychopy import visual, core, event
from time import time, strftime, gmtime
from optparse import OptionParser
from pylsl import StreamInfo, StreamOutlet, local_clock
from glob import glob
from random import choice
parser = OptionParser()
parser.add_option("-d", "--duration",
dest="duration", type='int', default=400,
help="duration of the recording in seconds.")
(options, args) = parser.parse_args()
# create
info = StreamInfo('Markers', 'Markers', 1, 0, 'int32', 'myuidw43536')
# next make an outlet
outlet = StreamOutlet(info)
markernames = [1, 2]
start = time()
n_trials = 2010
iti = .3
soa = 0.2
jitter = 0.2
record_duration = np.float32(options.duration)
# Setup log
position = np.random.binomial(1, 0.15, n_trials)
#user input parameters
eeg_name = stream1['name']
eeg_type = stream1['type']
eeg_chan = stream1['channels']
eeg_hz = stream1['sample_rate']
eeg_data = stream1['datatype']
eeg_id = stream1['id']
aux_name = stream2['name']
aux_type = stream2['type']
aux_chan = stream2['channels']
aux_hz = stream2['sample_rate']
aux_data = stream2['datatype']
aux_id = stream2['id']
#create StreamInfo
self.info_eeg = StreamInfo(eeg_name,eeg_type,eeg_chan,eeg_hz,eeg_data,eeg_id)
self.info_aux = StreamInfo(aux_name,aux_type,aux_chan,aux_hz,aux_data,aux_id)
#channel locations
chns = self.info_eeg.desc().append_child('channels')
if self.eeg_channels == 16:
labels = ['Fp1','Fp2', 'C3','C4','T5','T6','O1','O2','F7','F8','F3','F4','T3','T4','P3','P4']
else:
labels = ['Fp1','Fp2', 'C3','C4','T5','T6','O1','O2']
for label in labels:
ch = chns.append_child("channel")
ch.append_child_value('label', label)
ch.append_child_value('unit','microvolts')
ch.append_child_value('type','EEG')
#additional Meta Data
self.info_eeg.desc().append_child_value('manufacturer','OpenBCI Inc.')
def _create_primitive_lsl_outlet(self, name):
"""Create 5 channel primitive data outlet"""
stream_info = lsl.StreamInfo(
name=name,
type='Pupil Capture',
channel_count=5,
channel_format=lsl.cf_double64,
source_id='%s @ %s' % (name, self.g_pool.ipc_sub_url))
self._append_channel_info(stream_info,
("diameter", "confidence", "timestamp",
"norm_pos_x", "norm_pos_y"))
self._append_acquisition_info(stream_info)
return lsl.StreamOutlet(stream_info)
def set_device_info(self, device_info):
import pylsl
import uuid
super(LslProcessor, self).set_device_info(device_info)
channels = self._device_info.channels
info = pylsl.StreamInfo(self._device_info.name, 'EEG',
len(channels),
self._device_info.fs,
'float32', str(uuid.uuid4()))
meta_channels = info.desc().append_child('channels')
for channel in channels:
meta_channels.append_child('channel') \
.append_child_value('label', str(channel)) \
.append_child_value('unit', 'microvolts') \
.append_child_value('type', 'EEG')
self._outlet = pylsl.StreamOutlet(info)
def stream(address, backend='auto', interface=None, name=None, ppg_enabled=False, acc_enabled=False, gyro_enabled=False, eeg_disabled=False,):
bluemuse = backend == 'bluemuse'
if not bluemuse:
if not address:
found_muse = find_muse(name)
if not found_muse:
return
else:
address = found_muse['address']
name = found_muse['name']
if not eeg_disabled:
eeg_info = StreamInfo('Muse', 'EEG', MUSE_NB_EEG_CHANNELS, MUSE_SAMPLING_EEG_RATE, 'float32',
'Muse%s' % address)
eeg_info.desc().append_child_value("manufacturer", "Muse")
eeg_channels = eeg_info.desc().append_child("channels")
for c in ['TP9', 'AF7', 'AF8', 'TP10', 'Right AUX']:
eeg_channels.append_child("channel") \
.append_child_value("label", c) \
.append_child_value("unit", "microvolts") \
.append_child_value("type", "EEG")
eeg_outlet = StreamOutlet(eeg_info, LSL_EEG_CHUNK)
if ppg_enabled:
ppg_info = StreamInfo('Muse', 'PPG', MUSE_NB_PPG_CHANNELS, MUSE_SAMPLING_PPG_RATE,
'float32', 'Muse%s' % address)
ppg_info.desc().append_child_value("manufacturer", "Muse")