Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_compound_quantities(self):
m1 = skdb.Unit("m**2/m**3")
m2 = quantities.CompoundUnit("m**2/m**3")
self.assertTrue(m1.compatible(m2))
def test_uncertain_quantities(self):
Returns
pq.quantities.Quantity object, the updated sampling period.
"""
if sampling_period is None:
if time_column is not None:
data_sampling = np.unique(
np.diff(sorted(np.unique(data[:, 1]))))
if len(data_sampling) > 1:
raise ValueError('Different sampling distances found in '
'data set (%s)' % data_sampling)
else:
dt = data_sampling[0]
else:
raise ValueError('Can not estimate sampling rate without time '
'column id provided.')
sampling_period = pq.CompoundUnit(str(dt) + '*'
+ time_unit.units.u_symbol)
elif not isinstance(sampling_period, pq.UnitQuantity):
raise ValueError("sampling_period is not specified as a unit.")
return sampling_period
def create_quantity(values, unitstr):
if "*" in unitstr:
unit = pq.CompoundUnit(stringify(unitstr))
else:
unit = unitstr
return pq.Quantity(values, unit)
'min_analog_val':
self.__nsx_ext_header[nsx_nb]['min_analog_val'],
'max_analog_val':
self.__nsx_ext_header[nsx_nb]['max_analog_val'],
'min_digital_val':
self.__nsx_ext_header[nsx_nb]['min_digital_val'],
'max_digital_val':
self.__nsx_ext_header[nsx_nb]['max_digital_val'],
'timestamp_resolution':
self.__nsx_basic_header[nsx_nb]['timestamp_resolution'],
'bytes_in_headers':
self.__nsx_basic_header[nsx_nb]['bytes_in_headers'],
'sampling_rate':
self.__nsx_basic_header[nsx_nb]['timestamp_resolution']
/ self.__nsx_basic_header[nsx_nb]['period'] * pq.Hz,
'time_unit': pq.CompoundUnit("1.0/{0}*s".format(
self.__nsx_basic_header[nsx_nb]['timestamp_resolution']
/ self.__nsx_basic_header[nsx_nb]['period']))}
return nsx_parameters[param_name]
'channel_ids': self.__nev_ext_header[b'NEUEVWAV']['electrode_id'],
'channel_labels': self.__channel_labels[self.__nev_spec](),
'event_unit': pq.CompoundUnit("1.0/{0} * s".format(
self.__nev_basic_header['timestamp_resolution'])),
'nb_units': dict(zip(
self.__nev_ext_header[b'NEUEVWAV']['electrode_id'],
self.__nev_ext_header[b'NEUEVWAV']['nb_sorted_units'])),
'digitization_factor': dict(zip(
self.__nev_ext_header[b'NEUEVWAV']['electrode_id'],
self.__nev_ext_header[b'NEUEVWAV']['digitization_factor'])),
'data_size': self.__nev_basic_header['bytes_in_data_packets'],
'waveform_size': self.__waveform_size[self.__nev_spec](),
'waveform_dtypes': self.__get_waveforms_dtype(),
'waveform_sampling_rate':
self.__nev_basic_header['sample_resolution'] * pq.Hz,
'waveform_time_unit': pq.CompoundUnit("1.0/{0} * s".format(
self.__nev_basic_header['sample_resolution'])),
'waveform_unit': pq.uV}
return nev_parameters[param_name]
nsx_parameters = {
'nb_data_points': int(
(self.__get_file_size(filename) - bytes_in_headers)
/ (2 * self.__nsx_basic_header[nsx_nb]['channel_count']) - 1),
'labels': labels,
'units': np.array([units] * self.__nsx_basic_header[nsx_nb]['channel_count']),
'min_analog_val': -1 * np.array(dig_factor),
'max_analog_val': np.array(dig_factor),
'min_digital_val': np.array(
[-1000] * self.__nsx_basic_header[nsx_nb]['channel_count']),
'max_digital_val': np.array([1000] * self.__nsx_basic_header[nsx_nb]['channel_count']),
'timestamp_resolution': 30000,
'bytes_in_headers': bytes_in_headers,
'sampling_rate': 30000 / self.__nsx_basic_header[nsx_nb]['period'] * pq.Hz,
'time_unit': pq.CompoundUnit("1.0/{0}*s".format(
30000 / self.__nsx_basic_header[nsx_nb]['period']))}
# Returns complete dictionary because then it does not need to be called so often
return nsx_parameters
self.sampling_rate = sig_chans['sampling_rate'][0] * pq.Hz
self.sampling_period = 1. / self.sampling_rate
sigs_size = self._rawio.get_signal_size(block_index=block_index, seg_index=seg_index,
channel_indexes=self._global_channel_indexes)
self.shape = (sigs_size, self._nb_chan)
self.t_start = self._rawio.get_signal_t_start(block_index, seg_index,
self._global_channel_indexes) * pq.s
# magnitude_mode='raw' is supported only if all offset=0
# and all gain are the same
support_raw_magnitude = np.all(sig_chans['gain'] == sig_chans['gain'][0]) and \
np.all(sig_chans['offset'] == 0.)
if support_raw_magnitude:
str_units = ensure_signal_units(sig_chans['units'][0]).units.dimensionality.string
self._raw_units = pq.CompoundUnit('{}*{}'.format(sig_chans['gain'][0], str_units))
else:
self._raw_units = None
# both necessary attr and annotations
annotations = {}
annotations['name'] = self._make_name(None)
if len(sig_chans) == 1:
# when only one channel raw_annotations are set to standart annotations
d = self._rawio.raw_annotations['blocks'][block_index]['segments'][seg_index][
'signals'][self._global_channel_indexes[0]]
annotations.update(d)
array_annotations = {
'channel_names': np.array(sig_chans['name'], copy=True),
'channel_ids': np.array(sig_chans['id'], copy=True),
}
if not (t_start is None or (isinstance(t_start, pq.Quantity) and
t_start.dimensionality.simplified ==
pq.Quantity(1, "s").dimensionality)):
raise TypeError("t_start must be a time quantity!")
if not (t_stop is None or (isinstance(t_stop, pq.Quantity) and
t_stop.dimensionality.simplified ==
pq.Quantity(1, "s").dimensionality)):
raise TypeError("t_stop must be a time quantity!")
if not (isinstance(trim, bool)):
raise TypeError("trim must be bool!")
# main function:
units = pq.CompoundUnit(
"%s*s" % str(sampling_period.rescale('s').magnitude))
spiketrain = spiketrain.rescale(units)
if t_start is None:
t_start = spiketrain.t_start
else:
t_start = t_start.rescale(spiketrain.units)
if t_stop is None:
t_stop = spiketrain.t_stop
else:
t_stop = t_stop.rescale(spiketrain.units)
time_vector = np.zeros(int((t_stop - t_start)) + 1)
spikes_slice = spiketrain.time_slice(t_start, t_stop) \
if len(spiketrain) else np.array([])