Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_works_earth(self):
H_p = 8500 * pq.m
R_p = 1 * aq.R_e
R_s = 1 * aq.R_s
answer = 1.12264e-06 * pq.dimensionless
result = ratioTerminatorToStar(H_p, R_p, R_s)
self.assertTrue(answer - result < 0.001)
# needed for Python3 compatibility
from __future__ import absolute_import
import os.path
import warnings
from datetime import datetime
import numpy as np
import quantities as pq
from neo.io.baseio import BaseIO
from neo.core import Block, Segment, SpikeTrain, AnalogSignal
value_type_dict = {'V': pq.mV,
'I': pq.pA,
'g': pq.CompoundUnit("10^-9*S"),
'no type': pq.dimensionless}
class NestIO(BaseIO):
"""
Class for reading NEST output files. GDF files for the spike data and DAT
files for analog signals are possible.
Usage:
>>> from neo.io.nestio import NestIO
>>> files = ['membrane_voltages-1261-0.dat',
'spikes-1258-0.gdf']
>>> r = NestIO(filenames=files)
>>> seg = r.read_segment(gid_list=[], t_start=400 * pq.ms,
t_stop=600 * pq.ms,
id_column_gdf=0, time_column_gdf=1,
List of neo.SpikeTrains with different firing rates, forming
a CPP with amplitude distribution A
"""
# Computation of Parameters of the two CPPs that will be merged
# (uncorrelated with heterog. rates + correlated with homog. rates)
N = len(rate) # number of output spike trains
A_exp = np.dot(A, range(N + 1)) # expectation of A
r_sum = np.sum(rate) # sum of all output firing rates
r_min = np.min(rate) # minimum of the firing rates
r1 = r_sum - N * r_min # rate of the uncorrelated CPP
r2 = r_sum / float(A_exp) - r1 # rate of the correlated CPP
r_mother = r1 + r2 # rate of the hidden mother process
# Check the analytical constraint for the amplitude distribution
if A[1] < (r1 / r_mother).rescale(dimensionless).magnitude:
raise ValueError('A[1] too small / A[i], i>1 too high')
# Compute the amplitude distrib of the correlated CPP, and generate it
a = [(r_mother * i) / float(r2) for i in A]
a[1] = a[1] - r1 / float(r2)
CPP = _cpp_hom_stat(a, t_stop, r_min, t_start)
# Generate the independent heterogeneous Poisson processes
POISS = [
homogeneous_poisson_process(i - r_min, t_start, t_stop) for i in rate]
# Pool the correlated CPP and the corresponding Poisson processes
out = [_pool_two_spiketrains(CPP[i], POISS[i]) for i in range(N)]
return out
# load waveforms if requested
if load_waveforms and not lazy:
wf_dtype = self.__nev_params('waveform_dtypes')[channel_id]
wf_size = self.__nev_params('waveform_size')[channel_id]
waveforms = spikes['waveform'].flatten().view(wf_dtype)
waveforms = waveforms.reshape(int(spikes.size), 1, int(wf_size))
if scaling == 'voltage':
st.waveforms = (
waveforms[mask] * self.__nev_params('waveform_unit')
* self.__nev_params('digitization_factor')[channel_id]
/ 1000.)
elif scaling == 'raw':
st.waveforms = waveforms[mask] * pq.dimensionless
else:
raise ValueError(
'Unkown option {1} for parameter scaling.'.format(scaling))
st.sampling_rate = self.__nev_params('waveform_sampling_rate')
st.left_sweep = self.__get_left_sweep_waveforms()[channel_id]
# add additional annotations
st.annotate(
unit_id=int(unit_id),
channel_id=int(channel_id))
return st
The analog signal containing the discretization of the function to
interpolate
times : quantities.Quantity (vector of time points)
The time points at which the step interpolation is computed
Returns
-------
quantities.Quantity object with same shape of `times`, and containing
the values of the interpolated signal at the time points in `times`
'''
dt = signal.sampling_period
# Compute the ids of the signal times to the left of each time in times
time_ids = np.floor(
((times - signal.t_start) / dt).rescale(
pq.dimensionless).magnitude).astype('i')
return (signal.magnitude[time_ids] * signal.units).rescale(signal.units)
x_label = pair[0].value_name + '(' + pair[0].value_units.dimensionality.latex + ')'
y_label = pair[1].value_name + '(' + pair[1].value_units.dimensionality.latex + ')'
for p in p1:
x_label += '\n' + str(p) + " = " + str(pair[0].getParamValue(p))
y_label += '\n' + str(p) + " = " + str(pair[1].getParamValue(p))
for p in p2:
x_label += '\n' + str(p) + " = " + str(MozaikParametrized.idd(pair[0].stimulus_id).getParamValue(p))
y_label += '\n' + str(p) + " = " + str(MozaikParametrized.idd(pair[1].stimulus_id).getParamValue(p))
params = {}
params["x_label"] = x_label
params["y_label"] = y_label
params["title"] = self.sheets[idx]
if pair[0].value_units != pair[1].value_units or pair[1].value_units == pq.dimensionless:
params["equal_aspect_ratio"] = False
ids = list(set(pair[0].ids) & set(pair[1].ids))
x = pair[0].get_value_by_id(ids)
y = pair[1].get_value_by_id(ids)
if self.parameters.ignore_nan:
idxs = numpy.logical_not(numpy.logical_or(numpy.isnan(numpy.array(x)),numpy.isnan(numpy.array(y))))
x = numpy.array(x)[idxs]
y = numpy.array(y)[idxs]
return [("ScatterPlot",ScatterPlot(x,y),gs,params)]
# transform dig value to physical value
sym_ana = (max_ana[idx_ch] == -min_ana[idx_ch])
sym_dig = (max_dig[idx_ch] == -min_dig[idx_ch])
if sym_ana and sym_dig:
sig_ch *= float(max_ana[idx_ch]) / float(max_dig[idx_ch])
else:
# general case (same result as above for symmetric input)
sig_ch -= min_dig[idx_ch]
sig_ch *= float(max_ana[idx_ch] - min_ana[idx_ch]) / \
float(max_dig[idx_ch] - min_dig[idx_ch])
sig_ch += float(min_ana[idx_ch])
sig_unit = units[idx_ch].decode()
elif scaling == 'raw':
sig_ch = signal[dbl_idx][:, idx_ch][mask].astype(int)
sig_unit = pq.dimensionless
else:
raise ValueError(
'Unkown option {1} for parameter '
'scaling.'.format(scaling))
t_start = data_times[0].rescale(nsx_time_unit)
anasig = AnalogSignal(
signal=pq.Quantity(sig_ch, sig_unit, copy=False),
sampling_rate=sampling_rate,
t_start=t_start,
name=labels[idx_ch],
description=description,
file_origin='.'.join([self._filenames['nsx'], 'ns%i' % nsx_nb]))
if lazy:
anasig.lazy_shape = lazy_shape
"""
# Computing the population histogram with parameter binary=True to clip the
# spike trains before summing
pophist = time_histogram(spiketrains, binsize, binary=True)
# Computing the histogram of the entries of pophist (=Complexity histogram)
complexity_hist = np.histogram(
pophist.magnitude, bins=range(0, len(spiketrains) + 2))[0]
# Normalization of the Complexity Histogram to 1 (probabilty distribution)
complexity_hist = complexity_hist / complexity_hist.sum()
# Convert the Complexity pdf to an neo.AnalogSignal
complexity_distribution = neo.AnalogSignal(
np.array(complexity_hist).reshape(len(complexity_hist), 1) *
pq.dimensionless, t_start=0 * pq.dimensionless,
sampling_period=1 * pq.dimensionless)
return complexity_distribution
if 'waveforms' in sorting.get_unit_spike_feature_names(unit):
if len(waveforms) == 0:
waveforms = sorting.get_unit_spike_features(unit, 'waveforms')
else:
waveforms = np.vstack((waveforms, sorting.get_unit_spike_features(unit, 'waveforms')))
if save_waveforms:
if verbose:
print("Saving EventWaveforms")
if 'waveforms' in sorting.get_shared_unit_spike_feature_names():
eventwaveform = ch_group.require_group('EventWaveform')
waveform_ts = eventwaveform.require_group('waveform_timeseries')
data = waveform_ts.require_dataset('data', data=waveforms)
data.attrs['num_samples'] = len(waveforms)
data.attrs['sample_rate'] = sampling_frequency
data.attrs['unit'] = pq.dimensionless
times = waveform_ts.require_dataset('timestamps', data=timestamps)
times.attrs['num_samples'] = len(timestamps)
times.attrs['unit'] = pq.s
waveform_ts.attrs['electrode_group_id'] = chan
if recording is not None:
waveform_ts.attrs['electrode_identities'] = np.array([])
waveform_ts.attrs['electrode_idx'] = np.array([ch for i_c, ch in
enumerate(recording.get_channel_ids())
if recording.get_channel_groups(ch) == chan])
waveform_ts.attrs['start_time'] = 0 * pq.s
if recording_stop_time is not None:
waveform_ts.attrs[
'stop_time'] = recording_stop_time if recording_stop_time > unit_stop_time \
else unit_stop_time
waveform_ts.attrs['sample_rate'] = sampling_frequency
waveform_ts.attrs['sample_length'] = waveforms.shape[1]
# collapse against all parameters other then trial
(mean_rates, s) = colapse(mean_rates, stids, parameter_list=['trial'])
# mean_rates is the result of collapsing the whole multidimensional array of recordings (with several parameters)
# into a two-dimensional array having one line with 'trials' for each combination of the other parameters.
# In the case of sparseness, we consider as stimulus each combination of parameters other than 'trial'.
# Hence, to get the number of stimuli we just count the number of lines of mean_rates:
nstim = len(mean_rates)
# take the mean of each rate over trials
mean_rates = [sum(a)/len(a) for a in mean_rates]
# and computing the activity ratio
sparseness = numpy.power(numpy.sum(numpy.array(mean_rates),axis=0)/nstim,2) / (numpy.sum(numpy.power(mean_rates,2),axis=0)/nstim)
logger.debug('Adding PerNeuronValue containing trial averaged sparseness to datastore')
self.datastore.full_datastore.add_analysis_result(
PerNeuronValue( sparseness, segs[0].get_stored_spike_train_ids(), qt.dimensionless,
stimulus_id=None,
value_name='Sparseness',
sheet_name=sheet,
tags=self.tags,
analysis_algorithm=self.__class__.__name__,
period=None))