Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testMerge(self):
spk = SpikeTrain(spike_times=numpy.arange(0,110,10))
spk2 = SpikeTrain(spike_times=numpy.arange(100,210,10))
spk.merge(spk2)
assert (spk._t_stop == 200) and (len(spk) == 22)
def testMeanRate(self):
poisson_param = 1./40
isi = numpy.random.exponential(poisson_param, 1000)
poisson_times = numpy.cumsum(isi)
spk = SpikeTrain(spike_times = poisson_times)
assert 35 < spk.mean_rate() < 45
def testAppend(self):
spktrain = SpikeTrain(spike_times=numpy.arange(10))
self.assertRaises(Exception, self.spk.append, 0, spktrain)
def test_preprocess(self):
self.features = GeneralNetworkFeatures(logger_level="error")
time, spiketrains = self.features.preprocess(self.time_original, self.values)
self.assertEqual(time, self.time_original)
self.assertIsInstance(spiketrains[0], neo.core.SpikeTrain)
self.assertIsInstance(spiketrains[1], neo.core.SpikeTrain)
self.assertIsInstance(spiketrains[2], neo.core.SpikeTrain)
self.assertIsInstance(spiketrains[3], neo.core.SpikeTrain)
self.assertTrue(np.array_equal(spiketrains[0], self.values[0]))
self.assertTrue(np.array_equal(spiketrains[1], self.values[1]))
self.assertTrue(np.array_equal(spiketrains[2], self.values[2]))
self.assertTrue(np.array_equal(spiketrains[3], self.values[3]))
self.assertEqual(spiketrains[0].t_stop, self.time_original)
for nid in gdf_id_list:
selected_ids = self._get_selected_ids(nid, id_column,
time_column, t_start,
t_stop, time_unit, data)
times = data[selected_ids[0]:selected_ids[1], time_column]
spiketrain_list.append(SpikeTrain(
times, units=time_unit,
t_start=t_start, t_stop=t_stop,
id=nid, **args))
# if id_column is not given, all spike times are collected in one
# spike train with id=None
else:
train = data[:, time_column]
spiketrain_list = [SpikeTrain(train, units=time_unit,
t_start=t_start, t_stop=t_stop,
id=None, **args)]
return spiketrain_list
def read_spiketrain(self, path):
# TODO implement read spike train
if(len(path)==0):
event_waveform_group = self._processing["EventWaveform"]
else:
event_waveform_group = self._processing[path]["EventWaveform"]
spike_trains = []
for key in event_waveform_group:
timeserie = event_waveform_group[key]
timestamps = timeserie["timestamps"]
waveforms = timeserie["waveforms"]
spike_train = SpikeTrain(pq.Quantity(timestamps.data,timestamps.attrs["unit"]),
t_stop=pq.Quantity(timestamps.data[-1],timestamps.attrs["unit"]),
waveforms=pq.Quantity(waveforms.data, waveforms.attrs["unit"]))
spike_trains.append(spike_train)
# TODO: read attrs?
return spike_trains
References
----------
..[1] H. Shimazaki, S. Shinomoto, J Comput Neurosci (2010) 29:171–182.
"""
# Merge spike trains if list of spike trains given:
if isinstance(spiketrain, list):
_check_consistency_of_spiketrainlist(
spiketrain, t_start=t_start, t_stop=t_stop)
if t_start is None:
t_start = spiketrain[0].t_start
if t_stop is None:
t_stop = spiketrain[0].t_stop
spikes = np.concatenate([st.magnitude for st in spiketrain])
merged_spiketrain = SpikeTrain(np.sort(spikes), units=spiketrain[0].units,
t_start=t_start, t_stop=t_stop)
return instantaneous_rate(merged_spiketrain, sampling_period=sampling_period,
kernel=kernel, cutoff=cutoff, t_start=t_start,
t_stop=t_stop, trim=trim)
# Checks of input variables:
if not isinstance(spiketrain, SpikeTrain):
raise TypeError(
"spiketrain must be instance of :class:`SpikeTrain` of Neo!\n"
" Found: %s, value %s" % (type(spiketrain), str(spiketrain)))
if not (isinstance(sampling_period, pq.Quantity) and
sampling_period.dimensionality.simplified ==
pq.Quantity(1, "s").dimensionality):
raise TypeError(
"The sampling period must be a time quantity!\n"
# extracting spikes unit-wise and generate spiketrains
for unit_i in unit_list:
if not lazy:
# Extract all time stamps of that neuron on that electrode
unit_mask = np.where(cell_numbers == unit_i)[0]
spike_times = timestamps[unit_mask] * self.nse_time_unit
spike_times = spike_times - self.parameters_global['t_start']
time_mask = np.where(np.logical_and(spike_times >= t_start,
spike_times < t_stop))
spike_times = spike_times[time_mask]
else:
spike_times = pq.Quantity([], units=self.nse_time_unit)
# Create SpikeTrain object
st = SpikeTrain(times=spike_times,
t_start=t_start,
t_stop=t_stop,
sampling_rate=self.parameters_ncs[chid][
'sampling_rate'],
name="Channel %i, Unit %i" % (chid, unit_i),
file_origin=filename_nse,
unit_id=unit_i,
channel_id=chid)
if waveforms and not lazy:
# Collect all waveforms of the specific unit
# For computational reasons: no units, no time axis
st.waveforms = data_points[unit_mask][time_mask]
# TODO: Add units to waveforms (pq.uV?) and add annotation
# left_sweep = x * pq.ms indicating when threshold crossing
# occurred in waveform