Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_save_load_dates():
from dataflows import Flow, dump_to_path, load, set_type, printer
import datetime
Flow(
[{'id': 1, 'ts': datetime.datetime.now()},
{'id': 2, 'ts': datetime.datetime.now()}],
set_type('ts', type='datetime', format='%Y-%m-%d/%H:%M:%S'),
dump_to_path('out/test_save_load_dates')
).process()
res, _, _ = Flow(
load('out/test_save_load_dates/datapackage.json'),
printer()
).results()
def test_rename_resource():
from dataflows import Flow, printer, PackageWrapper, ResourceWrapper
def rename(package: PackageWrapper):
package.pkg.descriptor['resources'][0]['name'] = 'renamed'
yield package.pkg
res_iter = iter(package)
first: ResourceWrapper = next(res_iter)
yield first.it
yield from package
f = Flow(
({'a': x} for x in range(10)),
rename,
printer()
)
results, dp, stats = f.results()
print(dp.descriptor)
assert dp.descriptor['resources'][0]['name'] == 'renamed'
def test_exception_in_generator():
from dataflows import Flow, printer, exceptions
def generator():
for i in range(5):
raise MyException()
yield {"i": i}
with pytest.raises(exceptions.ProcessorError) as excinfo:
Flow(generator(), printer()).process()
assert isinstance(excinfo.value.cause, MyException)
def test_validate():
from dataflows import Flow, validate, set_type, printer, ValidationError, exceptions
def adder(row):
row['a'] += 0.5
row['a'] = str(row['a'])
f = Flow(
(dict(a=x) for x in range(10)),
set_type('a', type='integer'),
adder,
validate(),
printer()
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
f.process()
assert isinstance(excinfo.value.cause, ValidationError)
def test_exception_information_multiple_processors_iterable_error():
from dataflows import Flow, printer, exceptions
def func():
for i in range(10):
if i == 1:
raise MyException('custom-iterable-error')
yield dict(a=i)
flow = Flow(
func(),
printer()
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
flow.results()
assert str(excinfo.value.cause) == 'custom-iterable-error'
assert excinfo.value.processor_name == 'iterable_loader'
assert excinfo.value.processor_position == 1
def test_update_schema():
from dataflows import Flow, printer, update_schema, validate
f = Flow(
[['a', '-'], ['a', 0]],
update_schema(-1, missingValues=['-']),
validate(),
printer()
)
results, dp, stats = f.results()
print(dp.descriptor)
assert results[0] == [
dict(col0='a', col1=None),
dict(col0='a', col1=0),
]
def main(package_url):
jenkins_user_token = ckan_manager.get_jenkins_token('ckan-cloud-operator-jenkins-creds')
package_url = package_url.replace('https://', 'https://{}:{}@'.format(*jenkins_user_token))
stats_rows = []
Flow(
load(package_url),
aggregate_stats(stats_rows),
dump_to_path('data/aggregate_access_logs')
).process()
Flow(
(row for row in stats_rows),
dump_to_path('data/aggregate_access_logs_stats'),
printer()
).process()
def list_instances():
os.makedirs('data/list_instances', exist_ok=True)
data = []
Flow(
(get_instance_row(instance) for instance in ckan_instance_manager.list_instances(full=True)),
dump_to_json(data),
dump_to_path('data/list_instances'),
printer(num_rows=99999)
).process()
with open('data/list_instances.json', 'w') as f:
json.dump(data, f)
def flow(parameters):
return Flow(
printer(),
)