Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert isinstance(result, pq.UncertainQuantity)
assert result.magnitude.tolist() == pq_value.magnitude.tolist()
assert result.dimensionality.string == pq_value.dimensionality.string
assert result.uncertainty.magnitude.tolist() == pq_value.uncertainty.magnitude.tolist()
pq_dict = {"quantity": {"unit": "m", "value": 1},
"uq_quantity": {"unit": "m", "uncertainty": [3, 4], "value": [1.0, 2.0]}}
pq_values = {"quantity": pq.Quantity(1, "m"),
"uq_quantity": pq.UncertainQuantity([1, 2], "m", [3, 4])}
result = convert_back_quantities(pq_values)
assert result == pq_values
pq_values = {"list": [1, 2, 3], "quantity": {"unit": "m", "value": 1}}
result = convert_back_quantities(pq_values)
assert result == {"list": [1, 2, 3], "quantity": pq.Quantity(1, "m")}
paraVal = ParametersValidator()
# test validate_quantity
q = pq.Quantity([1, 2, 3], 'A')
paraVal.validate_quantity(q)
self.assertRaises(
BaseException, paraVal.validate_quantity, "I am not a quantity")
q = pq.Quantity([1,2,3], pq.s)
self.assertTrue(paraVal._validate_type_time(q))
self.assertRaises(
BaseException, paraVal._validate_type_voltage, q)
self.assertRaises(
BaseException, paraVal._validate_type_current, q)
q = pq.Quantity([1,2,3], pq.V)
self.assertTrue(paraVal._validate_type_voltage(q))
self.assertRaises(
BaseException, paraVal._validate_type_time, q)
self.assertRaises(
BaseException, paraVal._validate_type_current, q)
q = pq.Quantity([1,2,3], pq.A)
self.assertTrue(paraVal._validate_type_current(q))
self.assertRaises(
BaseException, paraVal._validate_type_voltage, q)
self.assertRaises(
BaseException, paraVal._validate_type_time, q)
self.assertRaises(
BaseException, paraVal._validate_type_current, "I am not a quantity")
self.assertRaises(
if hasattr(times, "units"):
time_units = times.units
else:
raise ValueError("Time units must be specified")
elif isinstance(times, pq.Quantity):
# could improve this test, what if units is a string?
if time_units != times.units:
times = times.rescale(time_units)
# should check time units have correct dimensions
obj = pq.Quantity.__new__(cls, signal, units=units, dtype=dtype, copy=copy)
if obj.ndim == 1:
obj = obj.reshape(-1, 1)
if len(times) != obj.shape[0]:
raise ValueError("times array and signal array must "
"have same length")
obj.times = pq.Quantity(times, units=time_units, dtype=float, copy=copy)
obj.segment = None
obj.channel_index = None
return obj
def _default_units(sig):
"""
If ``plots`` is missing ``units`` in ``metadata``, this function determines
default units.
"""
mapping = {
'V': 'uV', # convert voltages to microvolts
'N': 'mN', # convert forces to millinewtons
}
mapping = {pq.Quantity(1, k).dimensionality.simplified: v for k, v in mapping.items()}
return mapping.get(sig.units.dimensionality.simplified, sig.units)
goodtrains = [itrain for i, itrain in enumerate(spiketrains)
if istrain[i]]
badtrains = [itrain for i, itrain in enumerate(spiketrains)
if not istrain[i]]
spiketrains = (goodtrains +
[self._combine_spiketrains(badtrains)])
spiketrains = [itrain for itrain in spiketrains if itrain.size > 0]
if not spiketrains:
return self._default_spiketrain.copy()
# get the times of the spiketrains and combine them
waveforms = [itrain.waveforms for itrain in spiketrains]
rawtrains = np.array(np.concatenate(spiketrains, axis=1))
times = pq.Quantity(rawtrains, units=pq.ms, copy=False)
lens1 = np.array([wave.shape[1] for wave in waveforms])
lens2 = np.array([wave.shape[2] for wave in waveforms])
if lens1.max() != lens1.min() or lens2.max() != lens2.min():
lens1 = lens1.max() - lens1
lens2 = lens2.max() - lens2
waveforms = [np.pad(waveform,
((0, 0), (0, len1), (0, len2)),
'constant')
for waveform, len1, len2 in zip(waveforms,
lens1,
lens2)]
waveforms = np.concatenate(waveforms, axis=0)
# extract the trig2 annotation
trig2 = np.array(np.concatenate([itrain.annotations['trig2'] for
lens2)]
waveforms = np.concatenate(waveforms, axis=0)
# extract the trig2 annotation
trig2 = np.array(np.concatenate([itrain.annotations['trig2'] for
itrain in spiketrains], axis=1))
trig2 = pq.Quantity(trig2, units=pq.ms)
elif hasattr(spiketrains[0], 'units'):
return self._combine_spiketrains([spiketrains])
else:
times, waveforms, trig2 = zip(*spiketrains)
times = np.concatenate(times, axis=0)
# get the times of the SpikeTrains and combine them
times = pq.Quantity(times, units=pq.ms, copy=False)
# get the waveforms of the SpikeTrains and combine them
# these should be a 3D array with the first axis being the spike,
# the second axis being the recording channel (there is only one),
# and the third axis being the actual waveform
waveforms = np.concatenate(waveforms, axis=0)
# extract the trig2 annotation
trig2 = pq.Quantity(np.hstack(trig2),
units=pq.ms, copy=False)
if not times.size:
return self._default_spiketrain.copy()
# get the maximum time
t_stop = times[-1] * 2.
def _inventoryToScope(self):
"""
Translate information in `inventory` into class attributes.
"""
for k, u in six.iteritems(self.inventory):
val = _pq.Quantity(numpy.float64(u["valueSI"]), u["units"]["SI"])
value = val.rescale(u["units"][self._unitSystem])
setattr(self, u["symbol"], value.magnitude)
setattr(self, "f_" + u["symbol"], value)
err = _pq.Quantity(numpy.float64(u["errSI"]), u["units"]["SI"])
err = err.rescale(u["units"][self._unitSystem])
setattr(self, "f_" + u["symbol"] + "_err", err)
sig = sig.reshape(len(sig) * len(sig[0]))
# ADBitVolts is not guaranteed to be present in the header!
if 'ADBitVolts' in self.parameters_ncs[chid]:
sig *= self.parameters_ncs[chid]['ADBitVolts']
unit = pq.V
else:
warnings.warn(
'Could not transform data from file %s into physical '
'signal. '
'Missing "ADBitVolts" value in text header.')
# defining sampling rate for rescaling purposes
sampling_rate = self.parameters_ncs[chid]['sampling_unit'][0]
# creating neo AnalogSignal containing data
anasig = AnalogSignal(signal=pq.Quantity(sig, unit, copy=False),
sampling_rate=1 * sampling_rate,
# rescaling t_start to sampling time units
t_start=(header_time_data[p_id_start] * self.ncs_time_unit
- self.parameters_global['t_start']).rescale(
1 / sampling_rate),
name='channel_%i' % (chid),
channel_index=chid)
# removing protruding parts of first and last data packet
if anasig.t_start < t_start.rescale(anasig.t_start.units):
anasig = anasig.time_slice(t_start.rescale(anasig.t_start.units),
None)
if anasig.t_stop > t_stop.rescale(anasig.t_start.units):
anasig = anasig.time_slice(None,
t_stop.rescale(anasig.t_start.units))
def _read_array(self, group, name):
if group.__contains__(name) == False:
return None
array = group[name][:]
if group[name].attrs.get('dimensionality') is not None:
return pq.Quantity(array, group[name].attrs['dimensionality'])
return array
assert content['padding'].size == 0, \
"Cannot handle non-empty padding"
signal = content['wData']
note = content['note']
header = content['wave_header']
name = str(header['bname'].decode('utf-8'))
units = "".join([x.decode() for x in header['dataUnits']])
try:
time_units = "".join([x.decode() for x in header['xUnits']])
assert len(time_units)
except:
time_units = "s"
try:
t_start = pq.Quantity(header['hsB'], time_units)
except KeyError:
t_start = pq.Quantity(header['sfB'][0], time_units)
try:
sampling_period = pq.Quantity(header['hsA'], time_units)
except:
sampling_period = pq.Quantity(header['sfA'][0], time_units)
if self.parse_notes:
try:
annotations = self.parse_notes(note)
except ValueError:
warn("Couldn't parse notes field.")
annotations = {'note': note}
else:
annotations = {'note': note}
signal = AnalogSignal(signal, units=units, copy=False, t_start=t_start,
sampling_period=sampling_period, name=name,
file_origin=self.filename, **annotations)