Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_update_coords_metadata(self):
edit_metadata(TEST_CUBE_ZARR, metadata_path=TEST_NEW_META_YML, update_coords=True, in_place=False,
output_path=TEST_CUBE_ZARR_EDIT, monitor=print)
ds1 = zarr.open(TEST_CUBE_ZARR)
ds2 = zarr.open(TEST_CUBE_ZARR_EDIT)
self.assertEqual(ds1.__len__(), ds2.__len__())
self.assertEqual(ds1.attrs.__getitem__('start_date'), ds2.attrs.__getitem__('start_date'))
self.assertEqual(ds1.attrs.__getitem__('start_date'), ds2.attrs.__getitem__('start_date'))
self.assertEqual('happiness', ds2['conc_chl'].attrs.__getitem__('units'))
self.assertIn('geospatial_lon_units', ds2.attrs.keys())
self.assertEqual('degrees_east', ds2.attrs.__getitem__('geospatial_lon_units'))
self.assertNotIn('geospatial_lat_max', ds1.attrs.keys())
self.assertNotIn('geospatial_lat_max', ds2.attrs.keys())
def test_user_output(self):
result = self.invoke_cli(['edit', TEST_CUBE_ZARR, '-M', TEST_NEW_META_YML, '-o', TEST_CUBE_ZARR_EDIT])
self.assertEqual(0, result.exit_code)
ds1 = zarr.open(TEST_CUBE_ZARR)
ds2 = zarr.open(TEST_CUBE_ZARR_EDIT)
self.assertEqual(ds1.__len__(), ds2.__len__())
self.assertEqual(ds1.attrs.__getitem__('start_date'), ds2.attrs.__getitem__('start_date'))
self.assertEqual('happiness', ds2['conc_chl'].attrs.__getitem__('units'))
self.assertNotIn('creator_name', ds1.attrs.keys())
self.assertIn('creator_name', ds2.attrs.keys())
def test_zero_sequence_length(self):
# Mangle a sample data file to force a zero sequence length.
ts = msprime.simulate(10, mutation_rate=2, random_seed=5)
with tempfile.TemporaryDirectory(prefix="tsinf_format_test") as tempdir:
filename = os.path.join(tempdir, "samples.tmp")
with tsinfer.SampleData(path=filename) as sample_data:
for var in ts.variants():
sample_data.add_site(var.site.position, var.genotypes)
store = zarr.LMDBStore(filename, subdir=False)
data = zarr.open(store=store, mode="w+")
data.attrs["sequence_length"] = 0
store.close()
sample_data = tsinfer.load(filename)
self.assertEqual(sample_data.sequence_length, 0)
self.assertRaises(ValueError, tsinfer.generate_ancestors, sample_data)
def test_update_coords_only(self):
ds1 = zarr.open(TEST_CUBE_ZARR_COORDS)
delete_list = ['geospatial_lat_max', 'geospatial_lat_min', 'geospatial_lat_units', 'geospatial_lon_max',
'geospatial_lon_min', 'geospatial_lon_units', 'time_coverage_end', 'time_coverage_start']
for attr in ds1.attrs.keys():
if attr in delete_list:
ds1.attrs.__delitem__(attr)
result = self.invoke_cli(['edit', TEST_CUBE_ZARR_COORDS, '-o', TEST_CUBE_ZARR_EDIT, '-C'])
self.assertEqual(0, result.exit_code)
ds1 = zarr.open(TEST_CUBE_ZARR_COORDS)
ds2 = zarr.open(TEST_CUBE_ZARR_EDIT)
for attr in delete_list:
self.assertNotIn(attr, ds1.attrs.keys())
self.assertEqual(ds1.__len__(), ds2.__len__())
self.assertIn('geospatial_lat_max', ds2.attrs.keys())
self.assertIn('geospatial_lat_min', ds2.attrs.keys())
self.assertIn('geospatial_lat_resolution', ds2.attrs.keys())
self.assertIn('geospatial_lat_units', ds2.attrs.keys())
self.assertIn('geospatial_lon_max', ds2.attrs.keys())
self.assertEqual(180.0, ds2.attrs.__getitem__('geospatial_lon_max'))
self.assertEqual(-180.0, ds2.attrs.__getitem__('geospatial_lon_min'))
self.assertEqual('2010-01-04T00:00:00.000000000', ds2.attrs.__getitem__('time_coverage_end'))
self.assertEqual('2010-01-01T00:00:00.000000000', ds2.attrs.__getitem__('time_coverage_start'))
self.assertEqual('degrees_east', ds2.attrs.__getitem__('geospatial_lon_units'))
self.n_answers = len(self.a_itow) + 1
if train:
self.vqa = json.load(open(os.path.join(data_dir, 'vqa_train_final_3000.json'))) + \
json.load(
open(os.path.join(data_dir, 'vqa_val_final_3000.json')))
self.i_feat = zarr.open(os.path.join(
data_dir, 'trainval.zarr'), mode='r')
self.bbox = zarr.open(os.path.join(
data_dir, 'trainval_boxes.zarr'), mode='r')
self.sizes = pd.read_csv(os.path.join(
data_dir, 'trainval_image_size.csv'))
else:
self.vqa = json.load(
open(os.path.join(data_dir, 'vqa_test_toked.json')))
self.i_feat = zarr.open(os.path.join(
data_dir, 'test.zarr'), mode='r')
self.bbox = zarr.open(os.path.join(
data_dir, 'test_boxes.zarr'), mode='r')
self.sizes = pd.read_csv(os.path.join(
data_dir, 'test_image_size.csv'))
self.n_questions = len(self.vqa)
print('Loading done')
self.feat_dim = self.i_feat[list(self.i_feat.keys())[
0]].shape[1] + 4 # + bbox
self.init_pretrained_wemb(emb_dim)
def open_file(zarr_file):
return zarr.open(zarr_file, mode='r'), os.path.splitext(zarr_file)[0]
def _open_readonly(self):
if self.path is not None:
store = self._open_lmbd_readonly()
else:
# This happens when we finalise an in-memory container.
store = self.data.store
self.data = zarr.open(store=store, mode="r")
self._check_format()
self._mode = self.READ_MODE
def read_zarr_dataset(filename):
"""Read a zarr dataset, including an array or a group of arrays.
Parameters
--------
filename : str
Path to file ending in '.zarr'. File can contain either an array
or a group of arrays in the case of pyramid data.
Returns
-------
image : array-like
Array or list of arrays
shape : tuple
Shape of array or first array in list
"""
zr = zarr.open(filename, mode='r')
if isinstance(zr, zarr.core.Array):
# load zarr array
image = da.from_zarr(filename)
shape = image.shape
else:
# else load zarr all arrays inside file, useful for pyramid data
image = [da.from_zarr(filename, component=c) for c, a in zr.arrays()]
shape = image[0].shape
return image, shape