Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_dataset(self):
M=32
N=10000
K=10
L=150
T=224
fs = 30000 * pq.Hz
duration = N/fs
self.dataset=dict(
num_channels=M,
num_timepoints=N,
num_events=L,
num_units=K,
)
# create neo spike trains
times = np.arange(N)
recordings = np.random.randn(M, N)
positions = np.random.randn(M, 3)
templates = np.random.randn(K, T)
peaks = np.random.randn(K, M)
sources = np.random.randn(K, N)
spiketrains = []
def __init__(self, transform, win_len=0.1 * pq.s,
samplerate=500 * pq.Hz, verbose=False, n_samples=np.Inf,
scaling='dB', dynamic_range=60., center=True,
normalize=True, timesteps=None,
multi_spike_warning=True):
self.transform = transform
self.win_len = win_len
self.samplerate = samplerate
self.verbose = verbose
self.n_samples = n_samples
self.scaling = scaling
self.dynamic_range = dynamic_range
self.center = center
self.normalize = normalize
self.multi_spike_warning = multi_spike_warning
def read_protocol(self):
"""
Read the protocol waveform of the file, if present;
function works with ABF2 only. Protocols can be reconstructed
from the ABF1 header.
Returns: list of segments (one for every episode)
with list of analog signls (one for every DAC).
"""
sigs_by_segments, sig_names, sig_units = self.read_raw_protocol()
segments = []
for seg_index, sigs in enumerate(sigs_by_segments):
seg = Segment(index=seg_index)
t_start = self._t_starts[seg_index] * pq.s
for c, sig in enumerate(sigs):
ana_sig = AnalogSignal(sig, sampling_rate=self._sampling_rate * pq.Hz,
t_start=t_start, name=sig_names[c], units=sig_units[c])
seg.analogsignals.append(ana_sig)
segments.append(seg)
return segments
raise ValueError("Incompatible binning of spike train and LFP")
right_edge = int(left_edge + spiketrain.num_bins)
# duplicate spike trains
spiketrain_array = np.zeros((1, len_signals))
spiketrain_array[0, left_edge:right_edge] = spiketrain.to_array()
spiketrains_array = np.repeat(spiketrain_array, repeats=num_signals, axis=0).transpose()
# calculate coherence
frequencies, sfc = scipy.signal.coherence(
spiketrains_array, signal.magnitude,
fs=signal.sampling_rate.rescale('Hz').magnitude,
axis=0, **kwargs)
return (pq.Quantity(sfc, units=pq.dimensionless),
pq.Quantity(frequencies, units=pq.Hz))
for spike in spikes_slice:
index = int((spike - t_start))
time_vector[index] += 1
if cutoff < kernel.min_cutoff:
cutoff = kernel.min_cutoff
warnings.warn("The width of the kernel was adjusted to a minimally "
"allowed width.")
t_arr = np.arange(-cutoff * kernel.sigma.rescale(units).magnitude,
cutoff * kernel.sigma.rescale(units).magnitude +
sampling_period.rescale(units).magnitude,
sampling_period.rescale(units).magnitude) * units
r = scipy.signal.fftconvolve(time_vector,
kernel(t_arr).rescale(pq.Hz).magnitude, 'full')
if np.any(r < -1e-8): # abs tolerance in np.isclose
warnings.warn("Instantaneous firing rate approximation contains "
"negative values, possibly caused due to machine "
"precision errors.")
if not trim:
r = r[kernel.median_index(t_arr):-(kernel(t_arr).size -
kernel.median_index(t_arr))]
elif trim:
r = r[2 * kernel.median_index(t_arr):-2 * (kernel(t_arr).size -
kernel.median_index(t_arr))]
t_start += kernel.median_index(t_arr) * spiketrain.units
t_stop -= (kernel(t_arr).size -
kernel.median_index(t_arr)) * spiketrain.units
rate = neo.AnalogSignal(signal=r.reshape(r.size, 1),
self.shape = (nb_spike, )
self.t_start = self._rawio.segment_t_start(block_index, seg_index) * pq.s
self.t_stop = self._rawio.segment_t_stop(block_index, seg_index) * pq.s
# both necessary attr and annotations
annotations = {}
for k in ('name', 'id'):
annotations[k] = self._rawio.header['unit_channels'][unit_index][k]
ann = self._rawio.raw_annotations['blocks'][block_index]['segments'][seg_index]['units'][unit_index]
annotations.update(ann)
h = self._rawio.header['unit_channels'][unit_index]
wf_sampling_rate = h['wf_sampling_rate']
if not np.isnan(wf_sampling_rate) and wf_sampling_rate > 0:
self.sampling_rate = wf_sampling_rate * pq.Hz
self.left_sweep = (h['wf_left_sweep'] / self.sampling_rate).rescale('s')
self._wf_units = ensure_signal_units(h['wf_units'])
else:
self.sampling_rate = None
self.left_sweep = None
BaseProxy.__init__(self, **annotations)
def read_segment(self,
cascade = True,
lazy = False,
sampling_rate = 1.*pq.Hz,
t_start = 0.*pq.s,
unit = pq.V,
nbchannel = 1,
bytesoffset = 0,
dtype = 'f4',
rangemin = -10,
rangemax = 10,
):
"""
Reading signal in a raw binary interleaved compact file.
Arguments:
sampling_rate : sample rate
t_start : time of the first sample sample of each channel
extensions = ['nse', 'ncs', 'nev', 'ntt']
# mode can be 'file' or 'dir' or 'fake' or 'database'
# the main case is 'file' but some reader are base on a directory or
# a database this info is for GUI stuff also
mode = 'dir'
# hardcoded parameters from manual, which are not present in Neuralynx
# data files
# unit of timestamps in different files
nev_time_unit = pq.microsecond
ncs_time_unit = pq.microsecond
nse_time_unit = pq.microsecond
ntt_time_unit = pq.microsecond
# unit of sampling rate in different files
ncs_sr_unit = pq.Hz
nse_sr_unit = pq.Hz
ntt_sr_unit = pq.Hz
def __init__(self, sessiondir=None, cachedir=None, use_cache='hash',
print_diagnostic=False, filename=None):
"""
Arguments:
sessiondir: the directory the files of the recording session are
collected. Default 'None'.
print_diagnostic: indicates, whether information about the
loading of
data is printed in terminal or not. Default 'False'.
cachedir: the directory where metadata about the recording
session is
read from and written to.
use_cache: method used for cache identification. Possible values:
def victor_purpura_dist(
trains, q=1.0 * pq.Hz, kernel=None, sort=True, algorithm='fast'):
"""
Calculates the Victor-Purpura's (VP) distance. It is often denoted as
:math:`D^{\\text{spike}}[q]`.
It is defined as the minimal cost of transforming spike train `a` into
spike train `b` by using the following operations:
* Inserting or deleting a spike (cost 1.0).
* Shifting a spike from :math:`t` to :math:`t'` (cost :math:`q
\\cdot |t - t'|`).
A detailed description can be found in
*Victor, J. D., & Purpura, K. P. (1996). Nature and precision of
temporal coding in visual cortex: a metric-space analysis. Journal of
Neurophysiology.*