Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_write_dataset_custom_chunks(self):
a = H5DataIO(np.arange(30).reshape(5, 2, 3),
chunks=(1, 1, 3))
ts = TimeSeries('ts_name', a, 'A', timestamps=np.arange(5))
self.nwbfile.add_acquisition(ts)
with NWBHDF5IO(self.path, 'w') as io:
io.write(self.nwbfile, cache_spec=False)
with File(self.path, 'r') as f:
dset = f['/acquisition/ts_name/data']
self.assertTrue(np.all(dset[:] == a.data))
self.assertEqual(dset.chunks, (1, 1, 3))
def test_write_dataset_datachunkiterator_with_compression(self):
a = np.arange(30).reshape(5, 2, 3)
aiter = iter(a)
daiter = DataChunkIterator.from_iterable(aiter, buffer_size=2)
wrapped_daiter = H5DataIO(data=daiter,
compression='gzip',
compression_opts=5,
shuffle=True,
fletcher32=True)
ts = TimeSeries('ts_name', wrapped_daiter, 'A', timestamps=np.arange(5))
self.nwbfile.add_acquisition(ts)
with NWBHDF5IO(self.path, 'w') as io:
io.write(self.nwbfile, cache_spec=False)
with File(self.path, 'r') as f:
dset = f['/acquisition/ts_name/data']
self.assertEqual(dset.shape, a.shape)
self.assertListEqual(dset[:].tolist(), a.tolist())
self.assertEqual(dset.compression, 'gzip')
self.assertEqual(dset.compression_opts, 5)
self.assertEqual(dset.shuffle, True)
self.assertEqual(dset.fletcher32, True)
def test_dataio_dci_data(self):
def generator_factory():
return (i for i in range(100))
data = H5DataIO(DataChunkIterator(data=generator_factory()))
ts1 = TimeSeries('test_ts1', data,
'grams', starting_time=0.0, rate=0.1)
with self.assertWarnsRegex(UserWarning, r'The data attribute on this TimeSeries \(named: test_ts1\) has a '
'__len__, but it cannot be read'):
self.assertIs(ts1.num_samples, None)
for xi, yi in zip(data, generator_factory()):
assert np.allclose(xi, yi)
def test_write_dataset_custom_fillvalue(self):
a = H5DataIO(np.arange(20).reshape(5, 4), fillvalue=-1)
ts = TimeSeries('ts_name', a, 'A', timestamps=np.arange(5))
self.nwbfile.add_acquisition(ts)
with NWBHDF5IO(self.path, 'w') as io:
io.write(self.nwbfile, cache_spec=False)
with File(self.path, 'r') as f:
dset = f['/acquisition/ts_name/data']
self.assertTrue(np.all(dset[:] == a.data))
self.assertEqual(dset.fillvalue, -1)
def test_write_dataset_custom_compress(self):
a = H5DataIO(np.arange(30).reshape(5, 2, 3),
compression='gzip',
compression_opts=5,
shuffle=True,
fletcher32=True)
ts = TimeSeries('ts_name', a, 'A', timestamps=np.arange(5))
self.nwbfile.add_acquisition(ts)
with NWBHDF5IO(self.path, 'w') as io:
io.write(self.nwbfile, cache_spec=False)
with File(self.path, 'r') as f:
dset = f['/acquisition/ts_name/data']
self.assertTrue(np.all(dset[:] == a.data))
self.assertEqual(dset.compression, 'gzip')
self.assertEqual(dset.compression_opts, 5)
self.assertEqual(dset.shuffle, True)
self.assertEqual(dset.fletcher32, True)
def test_dataio_list_data(self):
length = 100
data = list(range(length))
ts1 = TimeSeries('test_ts1', H5DataIO(data),
'grams', starting_time=0.0, rate=0.1)
self.assertEqual(ts1.num_samples, length)
assert data == list(ts1.data)
def test_gzip_timestamps(self):
ts = TimeSeries('ts_name', [1, 2, 3], 'A', timestamps=H5DataIO(np.array([1., 2., 3.]), compression='gzip'))
self.nwbfile.add_acquisition(ts)
with NWBHDF5IO(self.path, 'w') as io:
io.write(self.nwbfile, cache_spec=False)
# confirm that the dataset was indeed compressed
with File(self.path, 'r') as f:
self.assertEqual(f['/acquisition/ts_name/timestamps'].compression, 'gzip')