Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_example_5():
from dataflows import Flow, set_type, dump_to_path
f = Flow(
country_population(),
set_type('population', type='number', groupChar=','),
dump_to_path('out/country_population')
)
_ = f.process()
# Now iterate on all resources
resources = iter(package)
beatles = next(resources)
def f(row):
row['is_guitarist'] = row['instrument'] == 'guitar'
return row
yield map(f, beatles)
f = Flow(
# Same one as above
load('data/beatles.csv'),
add_is_guitarist_column,
dump_to_path('out/beatles_guitarists')
)
_ = f.process()
def run_flow(datetime_format=None):
Flow(
[{'today': str(_today), 'now': str(_now)}],
set_type('today', type='date'),
set_type('now', type='datetime', format=datetime_format),
dump_to_path('out/dump_dates')
).process()
def test_save_load_dates():
from dataflows import Flow, dump_to_path, load, set_type, printer
import datetime
Flow(
[{'id': 1, 'ts': datetime.datetime.now()},
{'id': 2, 'ts': datetime.datetime.now()}],
set_type('ts', type='datetime', format='%Y-%m-%d/%H:%M:%S'),
dump_to_path('out/test_save_load_dates')
).process()
res, _, _ = Flow(
load('out/test_save_load_dates/datapackage.json'),
printer()
).results()
def test_load_from_env_var():
import os
from dataflows import load, dump_to_path
Flow(
[{'foo': 'bar'}],
dump_to_path('out/load_from_env_var')
).process()
os.environ['MY_DATAPACKAGE'] = 'out/load_from_env_var/datapackage.json'
results, dp, _ = Flow(
load('env://MY_DATAPACKAGE')
).results()
assert len(dp.resources) == 1
assert results == [[{'foo': 'bar'}]]
def main(package_url):
jenkins_user_token = ckan_manager.get_jenkins_token('ckan-cloud-operator-jenkins-creds')
package_url = package_url.replace('https://', 'https://{}:{}@'.format(*jenkins_user_token))
stats_rows = []
Flow(
load(package_url),
aggregate_stats(stats_rows),
dump_to_path('data/aggregate_access_logs')
).process()
Flow(
(row for row in stats_rows),
dump_to_path('data/aggregate_access_logs_stats'),
printer()
).process()
return Flow(
cluster_info(operator),
update_resource(['res_1'], name='cluster-info', path='cluster-info.csv'),
checkpoint(f'{prefix}-cluster-info'),
ckan_cloud_instances(operator),
update_resource(['res_2'], name='ckan-cloud-instances', path='ckan-cloud-instances.csv'),
)
if __name__ == '__main__':
prefix = os.environ['DATAPACKAGE_PREFIX']
operator = os.environ.get('CKAN_CLOUD_OPERATOR_BIN', 'ckan-cloud-operator')
Flow(
main_flow(prefix, operator),
printer(num_rows=1),
dump_to_path(f'data/{prefix}/resources')
).process()
def list_instances():
os.makedirs('data/list_instances', exist_ok=True)
data = []
Flow(
(get_instance_row(instance) for instance in ckan_instance_manager.list_instances(full=True)),
dump_to_json(data),
dump_to_path('data/list_instances'),
printer(num_rows=99999)
).process()
with open('data/list_instances.json', 'w') as f:
json.dump(data, f)