Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_index_dataset_with_location(index, default_metadata_type, driver):
"""
:type index: datacube.index._api.Index
:type default_metadata_type: datacube.model.MetadataType
"""
first_file = Path('/tmp/first/something.yaml').absolute()
second_file = Path('/tmp/second/something.yaml').absolute()
first_uri = driver.as_uri(first_file)
second_uri = driver.as_uri(second_file)
type_ = index.products.add_document(_pseudo_telemetry_dataset_type)
dataset = Dataset(type_, _telemetry_dataset, uris=[first_uri], sources={})
index.datasets.add(dataset)
stored = index.datasets.get(dataset.id)
assert stored.id == _telemetry_uuid
# TODO: Dataset types?
assert stored.type.id == type_.id
assert stored.metadata_type.id == default_metadata_type.id
if driver.uri_scheme == 'file':
assert stored.local_path == Path(first_file)
else:
assert stored.local_path is None
# Ingesting again should have no effect.
index.datasets.add(dataset)
stored = index.datasets.get(dataset.id)
locations = index.datasets.get_locations(dataset.id)
assert len(locations) == 1
def test_index_dataset_with_sources(index, default_metadata_type):
type_ = index.products.add_document(_pseudo_telemetry_dataset_type)
parent_doc = _telemetry_dataset.copy()
parent = Dataset(type_, parent_doc, None, sources={})
child_doc = _telemetry_dataset.copy()
child_doc['lineage'] = {'source_datasets': {'source': _telemetry_dataset}}
child_doc['id'] = '051a003f-5bba-43c7-b5f1-7f1da3ae9cfb'
child = Dataset(type_, child_doc, local_uri=None, sources={'source': parent})
with pytest.raises(MissingRecordError):
index.datasets.add(child, sources_policy='skip')
index.datasets.add(child, sources_policy='ensure')
assert index.datasets.get(parent.id)
assert index.datasets.get(child.id)
index.datasets.add(child, sources_policy='skip')
index.datasets.add(child, sources_policy='ensure')
index.datasets.add(child, sources_policy='verify')
# Deprecated property, but it should still work until we remove it completely.
# update with the same doc should do nothing
index.datasets.update(dataset)
updated = index.datasets.get(dataset.id)
print('>>>>', updated.local_uri)
if driver.uri_scheme == 'file':
assert updated.local_uri == 'file:///test/doc.yaml'
else:
assert updated.local_uri is None
assert updated.uris == ['%s:///test/doc.yaml' % driver.uri_scheme]
# update location
if driver.uri_scheme == 'file':
assert index.datasets.get(dataset.id).local_uri == 'file:///test/doc.yaml'
else:
assert updated.local_uri is None
update = Dataset(ls5_telem_type, example_ls5_nbar_metadata_doc,
uris=['%s:///test/doc2.yaml' % driver.uri_scheme],
sources={})
index.datasets.update(update)
updated = index.datasets.get(dataset.id)
# New locations are appended on update.
# They may be indexing the same dataset from a different location: we don't want to remove the original location.
# Returns the most recently added
if driver.uri_scheme == 'file':
assert updated.local_uri == 'file:///test/doc2.yaml'
else:
assert updated.local_uri is None
# But both still exist (newest-to-oldest order)
assert updated.uris == ['%s:///test/doc2.yaml' % driver.uri_scheme,
'%s:///test/doc.yaml' % driver.uri_scheme]
# adding more metadata should always be allowed
def _build_dataset(doc):
sources = {name: _build_dataset(src) for name, src in doc['lineage']['source_datasets'].items()}
return Dataset(_EXAMPLE_DATASET_TYPE, doc, uris=['file://test.zzz'], sources=sources)
"format": {"name": "GeoTiff"},
"image": {
"bands": {
'green': {
'type': 'reflective',
'cell_size': 25.0,
'path': example_gdal_path,
'label': 'Coastal Aerosol',
'number': '1',
},
}
}
}
# Without new band attribute, default to band number 1
d = Dataset(_EXAMPLE_DATASET_TYPE, defn, uris=['file:///tmp'])
ds = RasterDatasetDataSource(BandInfo(d, 'green'))
bandnum = ds.get_bandnumber(None)
assert bandnum == 1
with ds.open() as foo:
data = foo.read()
assert isinstance(data, np.ndarray)
#############
# With new 'image.bands.[band].band' attribute
band_num = 3
defn['image']['bands']['green']['band'] = band_num
d = Dataset(_EXAMPLE_DATASET_TYPE, defn, uris=['file:///tmp'])
def test_update_dataset(index, ls5_telem_doc, example_ls5_nbar_metadata_doc, driver):
"""
:type index: datacube.index._api.Index
"""
ls5_telem_type = index.products.add_document(ls5_telem_doc)
assert ls5_telem_type
example_ls5_nbar_metadata_doc['lineage']['source_datasets'] = {}
dataset = Dataset(ls5_telem_type, example_ls5_nbar_metadata_doc,
uris=['%s:///test/doc.yaml' % driver.uri_scheme],
sources={})
dataset = index.datasets.add(dataset)
assert dataset
# update with the same doc should do nothing
index.datasets.update(dataset)
updated = index.datasets.get(dataset.id)
print('>>>>', updated.local_uri)
if driver.uri_scheme == 'file':
assert updated.local_uri == 'file:///test/doc.yaml'
else:
assert updated.local_uri is None
assert updated.uris == ['%s:///test/doc.yaml' % driver.uri_scheme]
# update location
def test_index_dataset_with_location(index: Index, default_metadata_type: MetadataType):
first_file = Path('/tmp/first/something.yaml').absolute()
second_file = Path('/tmp/second/something.yaml').absolute()
type_ = index.products.add_document(_pseudo_telemetry_dataset_type)
dataset = Dataset(type_, _telemetry_dataset, uris=[first_file.as_uri()], sources={})
index.datasets.add(dataset)
stored = index.datasets.get(dataset.id)
assert stored.id == _telemetry_uuid
# TODO: Dataset types?
assert stored.type.id == type_.id
assert stored.metadata_type.id == default_metadata_type.id
assert stored.local_path == Path(first_file)
# Ingesting again should have no effect.
index.datasets.add(dataset)
stored = index.datasets.get(dataset.id)
locations = index.datasets.get_locations(dataset.id)
assert len(locations) == 1
# Remove the location
was_removed = index.datasets.remove_location(dataset.id, first_file.as_uri())
def extent(self):
return Dataset.extent.__get__(self, Dataset)
def resolve_ds(ds, sources, cache=None):
cached = cache.get(ds.id)
if cached is not None:
return cached
uris = [uri] if ds.id == main_uuid else []
doc = ds.doc
db_ds = db_dss.get(ds.id)
if db_ds:
product = db_ds.type
else:
product = match_product(doc)
return with_cache(Dataset(product, doc, uris=uris, sources=sources), ds.id, cache)
def doc2ds(doc, products):
if doc is None:
return None
p = products.get(doc['product'], None)
if p is None:
raise ValueError('No product named: %s' % doc['product'])
return Dataset(p, doc['metadata'], uris=doc['uris'])