Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"properties": {"name": {"column": "name"}},
},
"ownership": {
"schema": "Ownership",
"keys": ["person_id", "le_id"],
"properties": {
"owner": {"entity": "owner"},
"asset": {"entity": "person"},
"percentage": {"column": "percentage"},
},
},
},
}
with self.assertRaises(InvalidMapping):
list(model.map_entities(mapping_slavery))
def test_kek_sqlite(self):
entities = list(model.map_entities(self.kek_mapping))
assert len(entities) == 8712, len(entities)
ids = set([e.id for e in entities])
assert len(ids) == 5607, len(ids)
"expert": {
"schema": "Person",
"key": "id",
"id_column": "id",
"properties": {
"name": {"column": "name"},
"nationality": {"column": "nationality"},
"gender": {"column": "gender"},
},
}
},
}
# only use key/keys or key_column
with self.assertRaises(InvalidMapping):
list(model.map_entities(mapping))
del mapping["entities"]["expert"]["key"]
entities = list(model.map_entities(mapping))
self.assertEqual(len(entities), 14)
self.assertEqual(entities[0].id, "1")
self.assertEqual(entities[-1].id, "42")
"director": {
"schema": "Person",
"key": "id",
"key_literal": "person",
"properties": {
"name": {"column": "name"},
"address": {
"join": ", ",
"columns": ["house_number", "town", "zip"],
},
},
}
},
}
entities = list(model.map_entities(mapping))
assert len(entities) == 1, len(entities)
assert entities[0].get("address") == ["64, The Desert, 01234"], entities # noqa
mapping = {
"csv_url": url,
"entities": {
"director": {
"schema": "Person",
"key": "id",
"key_literal": "person",
"properties": {
"name": {"column": "name"},
"notes": {"split": "; ", "column": "fave_colours"},
},
}
},
}
entities = list(model.map_entities(mapping))
assert len(entities) == 1, len(entities)
self.assertCountEqual(
entities[0].get("notes"), ["brown", "black", "blue"]
) # noqa
def run_mapping(outfile, mapping_yaml, sign=True):
config = load_mapping_file(mapping_yaml)
try:
for dataset, meta in config.items():
ns = Namespace(dataset)
for mapping in keys_values(meta, "queries", "query"):
entities = model.map_entities(mapping, key_prefix=dataset)
for entity in entities:
if sign:
entity = ns.apply(entity)
write_object(outfile, entity)
except BrokenPipeError:
raise click.Abort()
except Exception as exc:
raise click.ClickException(str(exc))
def execute_mapping(query):
for entity in model.map_entities(query):
schema = model.get(entity['schema'])
properties = entity.pop('properties')
entity_ref = registry.entity.ref(entity)
for name, values in properties.items():
prop = schema.get(name)
for value in values:
if prop.type.prefix:
stmt = Statement(entity_ref, prop, value)
print(stmt)
print(stmt.invert())