Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_update_coords_metadata(self):
edit_metadata(TEST_CUBE_ZARR, metadata_path=TEST_NEW_META_YML, update_coords=True, in_place=False,
output_path=TEST_CUBE_ZARR_EDIT, monitor=print)
ds1 = zarr.open(TEST_CUBE_ZARR)
ds2 = zarr.open(TEST_CUBE_ZARR_EDIT)
self.assertEqual(ds1.__len__(), ds2.__len__())
self.assertEqual(ds1.attrs.__getitem__('start_date'), ds2.attrs.__getitem__('start_date'))
self.assertEqual(ds1.attrs.__getitem__('start_date'), ds2.attrs.__getitem__('start_date'))
self.assertEqual('happiness', ds2['conc_chl'].attrs.__getitem__('units'))
self.assertIn('geospatial_lon_units', ds2.attrs.keys())
self.assertEqual('degrees_east', ds2.attrs.__getitem__('geospatial_lon_units'))
self.assertNotIn('geospatial_lat_max', ds1.attrs.keys())
self.assertNotIn('geospatial_lat_max', ds2.attrs.keys())
def test_user_output(self):
result = self.invoke_cli(['edit', TEST_CUBE_ZARR, '-M', TEST_NEW_META_YML, '-o', TEST_CUBE_ZARR_EDIT])
self.assertEqual(0, result.exit_code)
ds1 = zarr.open(TEST_CUBE_ZARR)
ds2 = zarr.open(TEST_CUBE_ZARR_EDIT)
self.assertEqual(ds1.__len__(), ds2.__len__())
self.assertEqual(ds1.attrs.__getitem__('start_date'), ds2.attrs.__getitem__('start_date'))
self.assertEqual('happiness', ds2['conc_chl'].attrs.__getitem__('units'))
self.assertNotIn('creator_name', ds1.attrs.keys())
self.assertIn('creator_name', ds2.attrs.keys())
def test_zero_sequence_length(self):
# Mangle a sample data file to force a zero sequence length.
ts = msprime.simulate(10, mutation_rate=2, random_seed=5)
with tempfile.TemporaryDirectory(prefix="tsinf_format_test") as tempdir:
filename = os.path.join(tempdir, "samples.tmp")
with tsinfer.SampleData(path=filename) as sample_data:
for var in ts.variants():
sample_data.add_site(var.site.position, var.genotypes)
store = zarr.LMDBStore(filename, subdir=False)
data = zarr.open(store=store, mode="w+")
data.attrs["sequence_length"] = 0
store.close()
sample_data = tsinfer.load(filename)
self.assertEqual(sample_data.sequence_length, 0)
self.assertRaises(ValueError, tsinfer.generate_ancestors, sample_data)
def test_update_coords_only(self):
ds1 = zarr.open(TEST_CUBE_ZARR_COORDS)
delete_list = ['geospatial_lat_max', 'geospatial_lat_min', 'geospatial_lat_units', 'geospatial_lon_max',
'geospatial_lon_min', 'geospatial_lon_units', 'time_coverage_end', 'time_coverage_start']
for attr in ds1.attrs.keys():
if attr in delete_list:
ds1.attrs.__delitem__(attr)
result = self.invoke_cli(['edit', TEST_CUBE_ZARR_COORDS, '-o', TEST_CUBE_ZARR_EDIT, '-C'])
self.assertEqual(0, result.exit_code)
ds1 = zarr.open(TEST_CUBE_ZARR_COORDS)
ds2 = zarr.open(TEST_CUBE_ZARR_EDIT)
for attr in delete_list:
self.assertNotIn(attr, ds1.attrs.keys())
self.assertEqual(ds1.__len__(), ds2.__len__())
self.assertIn('geospatial_lat_max', ds2.attrs.keys())
self.assertIn('geospatial_lat_min', ds2.attrs.keys())
self.assertIn('geospatial_lat_resolution', ds2.attrs.keys())
self.assertIn('geospatial_lat_units', ds2.attrs.keys())
self.assertIn('geospatial_lon_max', ds2.attrs.keys())
self.assertEqual(180.0, ds2.attrs.__getitem__('geospatial_lon_max'))
self.assertEqual(-180.0, ds2.attrs.__getitem__('geospatial_lon_min'))
self.assertEqual('2010-01-04T00:00:00.000000000', ds2.attrs.__getitem__('time_coverage_end'))
self.assertEqual('2010-01-01T00:00:00.000000000', ds2.attrs.__getitem__('time_coverage_start'))
self.assertEqual('degrees_east', ds2.attrs.__getitem__('geospatial_lon_units'))
def verify_dtypes(self, chunk_size=None):
n = 100
if chunk_size is None:
chunk_size = 100
dtypes = [np.int8, np.uint8, np.int32, np.uint32, np.float64, np.float32]
source = {
str(dtype): zarr.array(np.arange(n, dtype=dtype), chunks=(chunk_size,))
for dtype in dtypes
}
dest = self.verify_round_trip(source)
for dtype in dtypes:
self.assertEqual(dest[str(dtype)].dtype, dtype)
def test_3d_array(self):
a = zarr.array(np.arange(27).reshape((3, 3, 3)))
self.verify_round_trip({"a": a})
def test_2d_array_chunk_size_10_10(self):
a = zarr.array(np.arange(100).reshape((10, 10)), chunks=(5, 10))
self.verify_round_trip({"a": a})
def test_2d_array(self):
a = zarr.array(np.arange(100).reshape((10, 10)))
self.verify_round_trip({"a": a})
def test_3d_array_chunks_size_1_1_1(self):
a = zarr.array(np.arange(27).reshape((3, 3, 3)), chunks=(1, 1, 1))
self.verify_round_trip({"a": a})
positions = pos = np.array([v.position for v in ts.variants()])
S = np.zeros((ts.sample_size, ts.num_mutations), dtype="u1")
for variant in ts.variants():
S[:,variant.index] = variant.genotypes
G = S.astype(np.uint8).T
#Create the ancestors
input_root = zarr.group()
tsinfer.InputFile.build(
input_root, genotypes=G,
# genotype_qualities=tsinfer.proba_to_phred(error_probability),
position=positions,
recombination_rate=rho, sequence_length=ts.sequence_length,
compress=False)
ancestors_root = zarr.group()
#tsinfer.extract_ancestors(ts, ancestors_root)
tsinfer.build_simulated_ancestors(input_root, ancestors_root, ts)
ancestors_ts = tsinfer.match_ancestors(input_root, ancestors_root)
assert ancestors_ts.sequence_length == ts.num_sites
inferred_ts = tsinfer.match_samples(
input_root, ancestors_ts, method="C",
simplify=False)
print("inferred num_edges = ", inferred_ts.num_edges)