Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_write_dataset_datachunkiterator_with_compression(self):
a = np.arange(30).reshape(5, 2, 3)
aiter = iter(a)
daiter = DataChunkIterator.from_iterable(aiter, buffer_size=2)
wrapped_daiter = H5DataIO(data=daiter,
compression='gzip',
compression_opts=5,
shuffle=True,
fletcher32=True)
ts = TimeSeries('ts_name', wrapped_daiter, 'A', timestamps=np.arange(5))
self.nwbfile.add_acquisition(ts)
with NWBHDF5IO(self.path, 'w') as io:
io.write(self.nwbfile, cache_spec=False)
with File(self.path, 'r') as f:
dset = f['/acquisition/ts_name/data']
self.assertEqual(dset.shape, a.shape)
self.assertListEqual(dset[:].tolist(), a.tolist())
self.assertEqual(dset.compression, 'gzip')
self.assertEqual(dset.compression_opts, 5)
self.assertEqual(dset.shuffle, True)
def test_write_dataset_datachunkiterator_data_and_time(self):
a = np.arange(30).reshape(5, 2, 3)
aiter = iter(a)
daiter = DataChunkIterator.from_iterable(aiter, buffer_size=2)
tstamps = np.arange(5)
tsiter = DataChunkIterator.from_iterable(tstamps)
ts = TimeSeries('ts_name', daiter, 'A', timestamps=tsiter)
self.nwbfile.add_acquisition(ts)
with NWBHDF5IO(self.path, 'w') as io:
io.write(self.nwbfile, cache_spec=False)
with File(self.path, 'r') as f:
dset = f['/acquisition/ts_name/data']
self.assertListEqual(dset[:].tolist(), a.tolist())
def test_dci_data(self):
def generator_factory():
return (i for i in range(100))
data = DataChunkIterator(data=generator_factory())
ts1 = TimeSeries('test_ts1', data,
'grams', starting_time=0.0, rate=0.1)
with self.assertWarnsRegex(UserWarning, r'The data attribute on this TimeSeries \(named: test_ts1\) has no '
'__len__'):
self.assertIs(ts1.num_samples, None)
for xi, yi in zip(data, generator_factory()):
assert np.allclose(xi, yi)
def test_dci_data_arr(self):
def generator_factory():
return (np.array([i, i+1]) for i in range(100))
data = DataChunkIterator(data=generator_factory())
ts1 = TimeSeries('test_ts1', data,
'grams', starting_time=0.0, rate=0.1)
# with self.assertWarnsRegex(UserWarning, r'.*name: \'test_ts1\'.*'):
with self.assertWarns(UserWarning):
self.assertIs(ts1.num_samples, None)
for xi, yi in zip(data, generator_factory()):
assert np.allclose(xi, yi)
# Open the file and read the next chunk
newfp = np.memmap(filename, dtype=dtype, mode='r', shape=shape)
curr_data = newfp[i:(i + 1), ...][0]
del newfp # Reopen the file in each iterator to prevent accumulation of data in memory
yield curr_data
return
####################
# Step 2: Wrap the generator in a DataChunkIterator
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#
from hdmf.data_utils import DataChunkIterator
data = DataChunkIterator(data=iter_largearray(filename='basic_sparse_iterwrite_testdata.npy',
shape=datashape),
maxshape=datashape,
buffer_size=10) # Buffer 10 elements into a chunk, i.e., create chunks of shape (10,10)
####################
# Step 3: Write the data as usual
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#
write_test_file(filename='basic_sparse_iterwrite_largearray.nwb',
data=data)
####################
# .. tip::
#
def __init__(self, **kwargs):
name, data, electrodes = popargs('name', 'data', 'electrodes', kwargs)
timestamps = getargs('timestamps', kwargs)
if not (isinstance(data, TimeSeries) or isinstance(timestamps, TimeSeries)):
if not (isinstance(data, DataChunkIterator) or isinstance(timestamps, DataChunkIterator)):
if len(data) != len(timestamps):
raise Exception('Must provide the same number of timestamps and spike events')
else:
# TODO: add check when we have DataChunkIterators
pass
super(SpikeEventSeries, self).__init__(name, data, electrodes, **kwargs)
while(x < 0.5 and num_chunks < max_chunks):
val = np.asarray([sin(random() * 2 * pi) for i in range(chunk_length)])
x = random()
num_chunks += 1
yield val
return
####################
# Step 2: Wrap the generator in a DataChunkIterator
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#
from hdmf.data_utils import DataChunkIterator
data = DataChunkIterator(data=iter_sin(10))
####################
# Step 3: Write the data as usual
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
#
# Here we use our wrapped generator to create the data for a synthetic time series.
write_test_file(filename='basic_iterwrite_example.nwb',
data=data)
####################
# Discussion
# ^^^^^^^^^^
# Note, we here actually do not know how long our timeseries will be.
print("maxshape=%s, recommended_data_shape=%s, dtype=%s" % (str(data.maxshape),
def __init__(self, **kwargs):
name, data, electrodes = popargs('name', 'data', 'electrodes', kwargs)
timestamps = getargs('timestamps', kwargs)
if not (isinstance(data, TimeSeries) and isinstance(timestamps, TimeSeries)):
if not (isinstance(data, DataChunkIterator) and isinstance(timestamps, DataChunkIterator)):
if len(data) != len(timestamps):
raise Exception('Must provide the same number of timestamps and spike events')
else:
# TODO: add check when we have DataChunkIterators
pass
super(SpikeEventSeries, self).__init__(name, data, electrodes, **kwargs)
for ch in curr_ids])
if len(np.unique(gains)) == 1: # if all gains are equal
scalar_conversion = np.unique(gains)[0] * 1e-6
channel_conversion = None
else:
scalar_conversion = 1.
channel_conversion = gains * 1e-6
def data_generator(recording, channels_ids):
# generates data chunks for iterator
for id in channels_ids:
data = recording.get_traces(channel_ids=[id]).flatten()
yield data
data = data_generator(recording=recording, channels_ids=curr_ids)
ephys_data = DataChunkIterator(data=data, iter_axis=1)
acquisition_name = es['name']
# To get traces in Volts = data*channel_conversion*conversion
ephys_ts = ElectricalSeries(
name=acquisition_name,
data=ephys_data,
electrodes=electrode_table_region,
starting_time=recording.frame_to_time(0),
rate=rate,
conversion=scalar_conversion,
channel_conversion=channel_conversion,
comments='Generated from SpikeInterface::NwbRecordingExtractor',
description='acquisition_description'
)
nwbfile.add_acquisition(ephys_ts)