Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_key_generation(self):
mapping = model.make_mapping(
{
"csv_url": "http://pets.com",
"entities": {"test": {"schema": "Person", "key": "id"}},
}
)
for ent in mapping.entities:
seed = ent.seed.hexdigest()
assert seed == sha1(b"").hexdigest(), seed
entities = mapping.map({})
assert len(entities) == 0, entities.keys()
entities = mapping.map({"id": "foo"})
assert len(entities) == 1, entities.keys()
ent0 = entities.get("test")
assert ent0.id == sha1(b"foo").hexdigest(), ent0
def test_multiple_keys(self):
mapping = model.make_mapping(
{
"csv_url": "http://pets.com",
"entities": {"test": {"schema": "Person", "key": ["b", "a"]}},
}
)
entities = mapping.map({"a": "aaa", "b": "bbb"})
ent0 = entities.get("test")
assert ent0.id == sha1(b"aaabbb").hexdigest(), ent0
mapping = model.make_mapping(
{
"csv_url": "http://pets.com",
"entities": {"test": {"schema": "Person", "key": ["a", "b"]}},
}
)
entities = mapping.map({"a": "aaa", "b": "bbb"})
ent0 = entities.get("test")
assert ent0.id == sha1(b"aaabbb").hexdigest(), ent0
def test_key_column_from_sql(self):
mapping = self.kek_mapping
del mapping["entities"]["company"]["keys"]
mapping["entities"]["company"]["id_column"] = "comp.id"
mapped = model.make_mapping(mapping)
assert len(mapped.source) == 2904, len(mapped.source)
assert len(mapped.entities) == 3, mapped.entities
assert len(mapped.refs) == 7, mapped.refs
entities = list(model.map_entities(mapping))
self.assertGreaterEqual(int(entities[0].id), 3000) # FIXME?
def bulk_load_query(stage, collection, query_id, query):
namespace = Namespace(collection.foreign_id)
mapping = model.make_mapping(query, key_prefix=collection.foreign_id)
aggregator = get_aggregator(collection)
writer = aggregator.bulk()
entities_count = 0
for idx, record in enumerate(mapping.source.records, 1):
for entity in mapping.map(record).values():
entity = namespace.apply(entity)
entities_count += 1
fragment = '%s-%s' % (query_id, idx)
writer.put(entity, fragment=fragment)
if idx > 0 and idx % 1000 == 0:
stage.report_finished(1000)
log.info("[%s] Loaded %s records, %s entities...",
collection.foreign_id,
idx, entities_count)
writer.flush()
def make_mapper(collection, mapping):
table = get_entity(mapping.table_id)
properties = table.get('properties', {})
csv_hash = first(properties.get('csvHash'))
if csv_hash is None:
raise RuntimeError("Source table doesn't have a CSV version")
url = archive.generate_url(csv_hash)
if not url:
local_path = archive.load_file(csv_hash)
if local_path is not None:
url = local_path.as_posix()
if url is None:
raise RuntimeError("Could not generate CSV URL for the table")
data = {'csv_url': url, 'entities': mapping.query}
return model.make_mapping(data, key_prefix=collection.foreign_id)
def stream_mapping(infile, outfile, mapping_yaml, sign=True):
sources = []
config = load_mapping_file(mapping_yaml)
for dataset, meta in config.items():
for data in keys_values(meta, "queries", "query"):
query = model.make_mapping(data, key_prefix=dataset)
source = StreamSource(query, data)
sources.append((dataset, source))
try:
for record in StreamSource.read_csv(infile):
for (dataset, source) in sources:
ns = Namespace(dataset)
if source.check_filters(record):
entities = source.query.map(record)
for entity in entities.values():
if sign:
entity = ns.apply(entity)
write_object(outfile, entity)
except BrokenPipeError:
raise click.Abort()
def load_query():
try:
query = request.json.get('mapping_query', {})
# just for validation
model.make_mapping({'entities': query})
except Exception as ex:
log.exception("Validation error: %s", request.json)
raise BadRequest(str(ex))
return query