Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return visual.ImageStim(win=mywin, image=filename)
mywin = visual.Window([1920, 1080], monitor="testMonitor", units="deg",
fullscr=True)
targets = map(loadImage, glob('stim/target-*.jpg'))
nontargets = map(loadImage, glob('stim/nontarget-*.jpg'))
for ii, trial in trials.iterrows():
# inter trial interval
core.wait(iti + np.random.rand() * jitter)
# onset
pos = trials['position'].iloc[ii]
image = choice(targets if pos == 1 else nontargets)
image.draw()
timestamp = local_clock()
outlet.push_sample([markernames[pos]], timestamp)
mywin.flip()
# offset
core.wait(soa)
mywin.flip()
if len(event.getKeys()) > 0 or (time() - start) > record_duration:
break
event.clearEvents()
# Cleanup
mywin.close()
logger.info('Using software trigger')
# get data file location
LSL_SERVER = 'StreamRecorderInfo'
inlet = cnbi_lsl.start_client(LSL_SERVER)
evefile = inlet.info().source_id()
eveoffset_file = evefile[:-4] + '-offset.txt'
logger.info('Event file is: %s' % evefile)
self.evefile = open(evefile, 'a')
if check_lsl_offset:
# check server LSL time server integrity
logger.info("Checking LSL server's timestamp integrity for logging software triggers.")
amp_name, amp_serial = pu.search_lsl()
sr = StreamReceiver(window_size=1, buffer_size=1, amp_serial=amp_serial, eeg_only=False, amp_name=amp_name)
local_time = pylsl.local_clock()
server_time = sr.get_window_list()[1][-1]
lsl_time_offset = local_time - server_time
with open(eveoffset_file, 'a') as f:
f.write('Local time: %.6f, Server time: %.6f, Offset: %.6f\n' % (local_time, server_time, lsl_time_offset))
logger.info('LSL timestamp offset (%.3f) saved to %s' % (lsl_time_offset, eveoffset_file))
elif self.lpttype == 'FAKE' or self.lpttype is None or self.lpttype is False:
logger.warning('Using a fake trigger.')
self.lpttype = 'FAKE'
self.lpt = None
else:
logger.error('Unrecognized lpttype device name %s' % lpttype)
sys.exit(-1)
def _record_pupil(self):
while self.running:
topic = self.socket.recv_string()
payload = serializer.loads(self.socket.recv(), encoding='utf-8')
samples_lock.acquire()
self.samples.append(('pupil', local_clock(), payload['diameter']))
samples_lock.release()
print('Terminating pupil tracker recording.')
logger.info('Trigger channel = %d' % trg_ch)
# PSD init
if SHOW_PSD:
psde = mne.decoding.PSDEstimator(sfreq=sfreq, fmin=1, fmax=50, bandwidth=None, \
adaptive=False, low_bias=True, n_jobs=1, normalization='length', verbose=None)
watchdog = qc.Timer()
tm = qc.Timer(autoreset=True)
last_ts = 0
while True:
sr.acquire()
window, tslist = sr.get_window() # window = [samples x channels]
window = window.T # chanel x samples
qc.print_c('LSL Diff = %.3f' % (pylsl.local_clock() - tslist[-1]), 'G')
# print event values
tsnew = np.where(np.array(tslist) > last_ts)[0]
if len(tsnew) == 0:
logger.warning('There seems to be delay in receiving data.')
time.sleep(1)
continue
trigger = np.unique(window[trg_ch, tsnew[0]:])
# for Biosemi
# if sr.amp_name=='BioSemi':
# trigger= set( [255 & int(x-1) for x in trigger ] )
if len(trigger) > 0:
logger.info('Triggers: %s' % np.array(trigger))
shim.up = 1
print(" Shimmer %d [Ok]" % (node_idx))
# Main acquisition loop
while True:
try:
sample = []
sample_lslclock = []
for shim in nodes:
if shim.up == 1:
sample.append(shim.read_data())
else:
sample.append([0] * (shim.n_fields))
for samp in sample:
sample_lslclock.append([pylsl.local_clock()] + list(samp[1:]))
if file_flag == 1:
simplejson.dump(sample_lslclock, outfile, separators=(',', ';'))
outfile.write('\n')
# print sample
# plt.title(str(sample[0][0]))
# leeq
if plot_flag == 1:
analogData.add([sample[0][1], sample[0][2]])
sample_idx = sample_idx + 1
if sample_idx % plt_rate == 0:
analogPlot.update(analogData)
if file_flag == 0:
shim.up = 1
print(" Shimmer %d [Ok]" % (node_idx))
# Main acquisition loop
while True:
try:
sample = []
sample_lslclock = []
for shim in nodes:
if shim.up == 1:
sample.append(shim.read_data())
else:
sample.append([0] * (shim.n_fields))
for samp in sample:
sample_lslclock.append([pylsl.local_clock()] + list(samp[1:]))
if file_flag == 1:
simplejson.dump(sample_lslclock, outfile, separators=(',', ';'))
outfile.write('\n')
# print sample
# plt.title(str(sample[0][0]))
# leeq
if plot_flag == 1:
analogData.add([sample[0][1], sample[0][2]])
sample_idx = sample_idx + 1
if sample_idx % plt_rate == 0:
analogPlot.update(analogData)
if file_flag == 0:
timestamp_offset = True
self.watchdog.reset()
tslist = []
received = False
chunk = None
while not received:
while self.watchdog.sec() < 5:
# chunk = [frames]x[ch], tslist = [frames]
if len(tslist) == 0:
chunk, tslist = self.inlets[0].pull_chunk(max_samples=self.stream_bufsize)
if blocking == False and len(tslist) == 0:
return np.empty((0, len(self.ch_list))), []
if len(tslist) > 0:
if timestamp_offset is True:
lsl_clock = pylsl.local_clock()
received = True
break
time.sleep(0.0005)
else:
logger.warning('Timeout occurred while acquiring data. Amp driver bug?')
# give up and return empty values to avoid deadlock
return np.empty((0, len(self.ch_list))), []
data = np.array(chunk)
# BioSemi has pull-up resistor instead of pull-down
if self.amp_name == 'BioSemi' and self._lsl_tr_channel is not None:
datatype = data.dtype
data[:, self._lsl_tr_channel] = (np.bitwise_and(255, data[:, self._lsl_tr_channel].astype(int)) - 1).astype(datatype)
# multiply values (to change unit)
if self.multiplier != 1:
for ch in headset.channel_mask:
channels.append_child("channel").append_child_value("label", ch).append_child_value("unit","microvolts").append_child_value("type","EEG")
# Outgoing buffer size = 360 seconds, transmission chunk size = 32 samples
outlet = pylsl.stream_outlet(info, 1, 32)
while True:
try:
s = headset.get_sample()
except epoc.EPOCTurnedOffError, e:
print "Headset is turned off, waiting..."
time.sleep(0.02)
else:
if s:
outlet.push_sample(pylsl.vectori(s), pylsl.local_clock())
headset.disconnect()
def get_latency(self):
"""Return the recording latency (current time minus last timestamp).
There is a weird bug in this method. Last timestamp looks like it does
not change. For example, if you call this method, wait 3 seconds, and
call it again, it will say the latency is 3 seconds. Is it related to
locking? Refreshing something about the last item in `data`? But the
lock releases after returning. Not sure how to replicate this bug. Lock
is not necessary here.
"""
return local_clock() - self.data[-1][-1]
def now(self) -> float:
"""Create the timestamp for the next marker using the pylsl
local_clock."""
return pylsl.local_clock()