Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@cli.command('export-excel', help="Export to Excel")
@click.option('-i', '--infile', type=click.File('r'), default='-') # noqa
@click.option('-o', '--outfile', type=click.Path(), required=True) # noqa
def export_excel(infile, outfile):
workbook = get_workbook()
try:
while True:
entity = read_entity(infile)
if entity is None:
break
write_entity(workbook, entity)
workbook.save(outfile)
except BrokenPipeError:
raise click.Abort()
@cli.command('import-ocds', help="Import open contracting data")
@click.option('-i', '--infile', type=click.File('r'), default='-') # noqa
@click.option('-o', '--outfile', type=click.File('w'), default='-') # noqa
def import_ocds(infile, outfile):
try:
while True:
line = infile.readline()
if not line:
return
record = json.loads(line)
for entity in convert_record(record):
if entity.id is not None:
write_object(outfile, entity)
except BrokenPipeError:
raise click.Abort()
@cli.command("map", help="Execute a mapping file and emit objects")
@click.option("-o", "--outfile", type=click.File("w"), default="-") # noqa
@click.option(
"--sign/--no-sign", is_flag=True, default=True, help="Apply HMAC signature"
) # noqa
@click.argument("mapping_yaml", type=click.Path(exists=True))
def run_mapping(outfile, mapping_yaml, sign=True):
config = load_mapping_file(mapping_yaml)
try:
for dataset, meta in config.items():
ns = Namespace(dataset)
for mapping in keys_values(meta, "queries", "query"):
entities = model.map_entities(mapping, key_prefix=dataset)
for entity in entities:
if sign:
entity = ns.apply(entity)
write_object(outfile, entity)
@cli.command("export-rdf", help="Export to RDF NTriples")
@click.option("-i", "--infile", type=click.File("r"), default="-") # noqa
@click.option("-o", "--outfile", type=click.File("w"), default="-") # noqa
@click.option(
"--qualified/--unqualified",
is_flag=True,
default=True,
help="Generate full predicates",
) # noqa
def export_rdf(infile, outfile, qualified=True):
exporter = RDFExporter(outfile, qualified=qualified)
export_stream(exporter, infile)
@cli.command("map-csv", help="Map CSV data from stdin and emit objects")
@click.option("-i", "--infile", type=click.File("r"), default="-") # noqa
@click.option("-o", "--outfile", type=click.File("w"), default="-") # noqa
@click.option(
"--sign/--no-sign", is_flag=True, default=True, help="Apply HMAC signature"
) # noqa
@click.argument("mapping_yaml", type=click.Path(exists=True))
def stream_mapping(infile, outfile, mapping_yaml, sign=True):
sources = []
config = load_mapping_file(mapping_yaml)
for dataset, meta in config.items():
for data in keys_values(meta, "queries", "query"):
query = model.make_mapping(data, key_prefix=dataset)
source = StreamSource(query, data)
sources.append((dataset, source))
try:
@cli.command("enrich", help="Find matching entities remotely")
@click.option("-i", "--infile", type=click.File("r"), default="-") # noqa
@click.option("-o", "--outfile", type=click.File("w"), default="-") # noqa
@click.argument("enricher")
def enrich(infile, outfile, enricher):
enricher = load_enricher(enricher)
try:
while True:
entity = read_entity(infile)
if entity is None:
break
for match in enricher.enrich_entity_raw(entity):
write_object(outfile, match)
except BrokenPipeError:
raise click.Abort()
@cli.command("match-entities", help="Unnests matches into entities")
@click.option("-i", "--infile", type=click.File("r"), default="-") # noqa
@click.option("-o", "--outfile", type=click.File("w"), default="-") # noqa
@click.option(
"-a", "--all", is_flag=True, default=False, help="Unnest non-positive matches"
) # noqa
def match_entities(infile, outfile, all):
try:
for match in Match.from_file(model, infile):
if not all and match.decision is not True:
continue
if match.canonical is not None:
write_object(outfile, match.canonical)
if match.entity is not None:
write_object(outfile, match.entity)
except BrokenPipeError:
raise click.Abort()
@cli.command("export-csv", help="Export to CSV")
@click.option("-i", "--infile", type=click.File("r"), default="-") # noqa
@click.option(
"-o",
"--outdir",
type=click.Path(file_okay=False, writable=True),
default=".",
help="output directory",
) # noqa
def export_csv(infile, outdir):
exporter = CSVExporter(outdir)
export_stream(exporter, infile)
@cli.command('export-cypher', help="Export to Cypher script")
@click.option('-i', '--infile', type=click.File('r'), default='-') # noqa
@click.option('-o', '--outfile', type=click.File('w'), default='-') # noqa
def export_cypher(infile, outfile):
exporter = CypherGraphExport(outfile)
try:
while True:
entity = read_entity(infile)
if entity is None:
break
exporter.write(entity)
except BrokenPipeError:
raise click.Abort()
@cli.command("expand", help="Expand enriched entities")
@click.option("-i", "--infile", type=click.File("r"), default="-") # noqa
@click.option("-o", "--outfile", type=click.File("w"), default="-") # noqa
@click.argument("enricher")
def expand(infile, outfile, enricher):
enricher = load_enricher(enricher)
try:
while True:
entity = read_entity(infile)
if entity is None:
break
for entity in enricher.expand_entity(entity):
write_object(outfile, entity)
except BrokenPipeError:
raise click.Abort()