Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
r = clirunner(['dataset', 'search'], expect_success=True)
assert ds.id in r.output
assert ds_bad1.id not in r.output
assert ds.sources['ab'].id in r.output
assert ds.sources['ac'].sources['cd'].id in r.output
r = clirunner(['dataset', 'info', '-f', 'csv', ds.id])
assert ds.id in r.output
r = clirunner(['dataset', 'info', '-f', 'yaml', '--show-sources', ds.id])
assert ds.sources['ae'].id in r.output
r = clirunner(['dataset', 'info', '-f', 'yaml', '--show-derived', ds.sources['ae'].id])
assert ds.id in r.output
ds_ = SimpleDocNav(gen_dataset_test_dag(1, force_tree=True))
assert ds_.id == ds.id
x = index.datasets.get(ds.id, include_sources=True)
assert str(x.sources['ab'].id) == ds.sources['ab'].id
assert str(x.sources['ac'].sources['cd'].id) == ds.sources['ac'].sources['cd'].id
check_skip_lineage_test(clirunner, index)
check_no_product_match(clirunner, index)
check_with_existing_lineage(clirunner, index)
check_inconsistent_lineage(clirunner, index)
check_missing_metadata_doc(clirunner)
check_missing_lineage(clirunner, index)
check_no_confirm(clirunner, p.datasets)
check_bad_yaml(clirunner, index)
# check --product=nosuchproduct
# make sure ds0 has duplicate C nodes with equivalent data
assert ds0.sources['ab'].sources['bc'].doc is not ds0.sources['ac'].doc
assert ds0.sources['ab'].sources['bc'].doc == ds0.sources['ac'].doc
ds = SimpleDocNav(dedup_lineage(ds0))
assert ds.sources['ab'].sources['bc'].doc is ds.sources['ac'].doc
assert ds.sources['ab'].sources['bc'].sources['cd'].doc is ds.sources['ac'].sources['cd'].doc
# again but with raw doc
ds = SimpleDocNav(dedup_lineage(ds0.doc))
assert ds.sources['ab'].sources['bc'].doc is ds.sources['ac'].doc
assert ds.sources['ab'].sources['bc'].sources['cd'].doc is ds.sources['ac'].sources['cd'].doc
# Test that we detect inconsistent metadata for duplicate entries
ds0 = SimpleDocNav(gen_dataset_test_dag(3, force_tree=True))
ds0.sources['ac'].doc['label'] = 'Modified'
ds0 = SimpleDocNav(ds0.doc)
assert ds0.sources['ab'].sources['bc'].doc != ds0.sources['ac'].doc
with pytest.raises(InvalidDocException, match=r'Inconsistent metadata .*'):
dedup_lineage(ds0)
# Test that we detect inconsistent lineage subtrees for duplicate entries
# Subtest 1: different set of keys
ds0 = SimpleDocNav(gen_dataset_test_dag(7, force_tree=True))
srcs = toolz.get_in(ds0.sources_path, ds0.sources['ac'].doc)
assert 'cd' in srcs
srcs['cd'] = {}
ds0 = SimpleDocNav(ds0.doc)
def test_dedup():
ds0 = SimpleDocNav(gen_dataset_test_dag(1, force_tree=True))
# make sure ds0 has duplicate C nodes with equivalent data
assert ds0.sources['ab'].sources['bc'].doc is not ds0.sources['ac'].doc
assert ds0.sources['ab'].sources['bc'].doc == ds0.sources['ac'].doc
ds = SimpleDocNav(dedup_lineage(ds0))
assert ds.sources['ab'].sources['bc'].doc is ds.sources['ac'].doc
assert ds.sources['ab'].sources['bc'].sources['cd'].doc is ds.sources['ac'].sources['cd'].doc
# again but with raw doc
ds = SimpleDocNav(dedup_lineage(ds0.doc))
assert ds.sources['ab'].sources['bc'].doc is ds.sources['ac'].doc
assert ds.sources['ab'].sources['bc'].sources['cd'].doc is ds.sources['ac'].sources['cd'].doc
# Test that we detect inconsistent metadata for duplicate entries
ds0 = SimpleDocNav(gen_dataset_test_dag(3, force_tree=True))
ds0.sources['ac'].doc['label'] = 'Modified'
ds0 = SimpleDocNav(ds0.doc)
assert ds0.sources['ab'].sources['bc'].doc != ds0.sources['ac'].doc
with pytest.raises(InvalidDocException, match=r'Inconsistent metadata .*'):
dedup_lineage(ds0)
def test_dataset_add_inconsistent_measurements(dataset_add_configs, index_empty, clirunner):
p = dataset_add_configs
index = index_empty
mk = dataset_maker(0)
# not set, empty, subset, full set, super-set
ds1 = SimpleDocNav(mk('A', product_type='eo', ))
ds2 = SimpleDocNav(mk('B', product_type='eo', measurements={}))
ds3 = SimpleDocNav(mk('C', product_type='eo', measurements={
'red': {}
}))
ds4 = SimpleDocNav(mk('D', product_type='eo', measurements={
'red': {},
'green': {},
}))
ds5 = SimpleDocNav(mk('E', product_type='eo', measurements={
'red': {},
'green': {},
'extra': {},
}))
dss = (ds1, ds2, ds3, ds4, ds5)
docs = [ds.doc for ds in dss]
prefix = write_files({
'products.yml': '''
name: eo
description: test product
def test_dataset_maker():
mk = dataset_maker(0)
assert mk('aa') == mk('aa')
a = SimpleDocNav(mk('A'))
b = SimpleDocNav(mk('B'))
assert a.id != b.id
assert a.doc['creation_dt'] == b.doc['creation_dt']
assert isinstance(a.id, str)
assert a.sources == {}
a1, a2 = [dataset_maker(i)('A', product_type='eo') for i in (0, 1)]
assert a1['id'] != a2['id']
assert a1['creation_dt'] != a2['creation_dt']
assert a1['product_type'] == 'eo'
c = SimpleDocNav(mk('C', sources=dict(a=a.doc, b=b.doc)))
assert c.sources['a'].doc is a.doc
assert c.sources['b'].doc is b.doc
def test_simple_doc_nav():
"""
A -> B
| |
| v
+--> C -> D
|
+--> E
"""
def node(name, **kwargs):
return dict(id=name, lineage=dict(source_datasets=kwargs))
A, _, C, _, _ = make_graph_abcde(node)
rdr = SimpleDocNav(A)
assert rdr.doc == A
assert rdr.doc_without_lineage_sources == node('A')
assert isinstance(rdr.sources['ae'], SimpleDocNav)
assert rdr.sources['ab'].sources['bc'].doc == C
assert rdr.doc_without_lineage_sources is rdr.doc_without_lineage_sources
assert rdr.sources is rdr.sources
assert isinstance(rdr.sources_path, tuple)
def visitor(node, name=None, depth=0, out=None):
s = '{}:{}:{:d}'.format(node.id, name if name else '..', depth)
out.append(s)
expect_preorder = '''
A:..:0
B:ab:1
with pytest.raises(InvalidDocException, match=r'Inconsistent lineage .*'):
dedup_lineage(ds0)
# Subtest 2: different values for "child" nodes
ds0 = SimpleDocNav(gen_dataset_test_dag(7, force_tree=True))
srcs = toolz.get_in(ds0.sources_path, ds0.sources['ac'].doc)
assert 'cd' in srcs
srcs['cd']['id'] = '7fe57724-ed44-4beb-a3ab-c275339049be'
ds0 = SimpleDocNav(ds0.doc)
with pytest.raises(InvalidDocException, match=r'Inconsistent lineage .*'):
dedup_lineage(ds0)
# Subtest 3: different name for child
ds0 = SimpleDocNav(gen_dataset_test_dag(7, force_tree=True))
srcs = toolz.get_in(ds0.sources_path, ds0.sources['ac'].doc)
assert 'cd' in srcs
srcs['CD'] = srcs['cd']
del srcs['cd']
ds0 = SimpleDocNav(ds0.doc)
with pytest.raises(InvalidDocException, match=r'Inconsistent lineage .*'):
dedup_lineage(ds0)
def test_dataset_add_ambgious_products(dataset_add_configs, index_empty, clirunner):
p = dataset_add_configs
index = index_empty
dss = [SimpleDocNav(dataset_maker(i)(
'A',
product_type='eo',
flag_a='a',
flag_b='b')) for i in [1, 2]]
prefix = write_files({
'products.yml': '''
name: A
description: test product A
metadata_type: minimal
metadata:
product_type: eo
flag_a: a
---
name: B
def resolve(main_ds, uri):
try:
main_ds = SimpleDocNav(dedup_lineage(main_ds))
except InvalidDocException as e:
return None, e
main_uuid = main_ds.id
ds_by_uuid = toolz.valmap(toolz.first, flatten_datasets(main_ds))
all_uuid = list(ds_by_uuid)
db_dss = {str(ds.id): ds for ds in index.datasets.bulk_get(all_uuid)}
lineage_uuids = set(filter(lambda x: x != main_uuid, all_uuid))
missing_lineage = lineage_uuids - set(db_dss)
if missing_lineage and fail_on_missing_lineage:
return None, "Following lineage datasets are missing from DB: %s" % (','.join(missing_lineage))
if verify_lineage:
def __call__(self, doc, uri):
"""Attempt to construct dataset from metadata document and a uri.
:param doc: Dictionary or SimpleDocNav object
:param uri: String "location" property of the Dataset
:return: (dataset, None) is successful,
:return: (None, ErrorMessage) on failure
"""
if not isinstance(doc, SimpleDocNav):
doc = SimpleDocNav(doc)
dataset, err = self._ds_resolve(doc, uri)
if dataset is None:
return None, err
is_consistent, reason = check_dataset_consistent(dataset)
if not is_consistent:
return None, reason
return dataset, None