Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_data_count_flow():
assert Flow(
get_data_count_views(),
checkpoint('test_checkpoint'),
).results()[0] == [[{'foo': 'bar'}]]
def test_load_from_checkpoint():
from dataflows import Flow, checkpoint
import shutil
shutil.rmtree('.checkpoints/test_load_from_checkpoint', ignore_errors=True)
assert Flow(
[{'foo': 'bar'}],
checkpoint('test_load_from_checkpoint')
).process()
assert Flow(
checkpoint('test_load_from_checkpoint')
).results()[0] == [[{'foo': 'bar'}]]
def test_load_from_checkpoint():
from dataflows import Flow, checkpoint
import shutil
shutil.rmtree('.checkpoints/test_load_from_checkpoint', ignore_errors=True)
assert Flow(
[{'foo': 'bar'}],
checkpoint('test_load_from_checkpoint')
).process()
assert Flow(
checkpoint('test_load_from_checkpoint')
).results()[0] == [[{'foo': 'bar'}]]
import shutil
dates = [
datetime.now(),
datetime.now(timezone.utc).astimezone()
]
shutil.rmtree('.checkpoints/test_load_dates_timezones', ignore_errors=True)
Flow(
[{'date': d.date(), 'datetime': d} for d in dates],
checkpoint('test_load_dates_timezones')
).process()
results = Flow(
checkpoint('test_load_dates_timezones')
).results()
assert list(map(lambda x: x['date'], results[0][0])) == \
list(map(lambda x: x.date(), dates))
assert list(map(lambda x: x['datetime'], results[0][0])) == \
list(map(lambda x: x, dates))
def test_load_dates_timezones():
from dataflows import Flow, checkpoint
from datetime import datetime, timezone
import shutil
dates = [
datetime.now(),
datetime.now(timezone.utc).astimezone()
]
shutil.rmtree('.checkpoints/test_load_dates_timezones', ignore_errors=True)
Flow(
[{'date': d.date(), 'datetime': d} for d in dates],
checkpoint('test_load_dates_timezones')
).process()
results = Flow(
checkpoint('test_load_dates_timezones')
).results()
assert list(map(lambda x: x['date'], results[0][0])) == \
list(map(lambda x: x.date(), dates))
assert list(map(lambda x: x['datetime'], results[0][0])) == \
list(map(lambda x: x, dates))
def main_flow(prefix, operator):
return Flow(
cluster_info(operator),
update_resource(['res_1'], name='cluster-info', path='cluster-info.csv'),
checkpoint(f'{prefix}-cluster-info'),
ckan_cloud_instances(operator),
update_resource(['res_2'], name='ckan-cloud-instances', path='ckan-cloud-instances.csv'),
)