Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
data2 = [
dict(x=1, y='a', xx=True),
dict(x=2, y='b', xx=True),
dict(x=3, y='c', xx=True),
]
f = Flow(
data,
delete_fields(['x'])
)
results, dp, _ = f.results()
for i in results[0]:
assert list(i.keys()) == ['y']
assert dp.descriptor['resources'][0]['schema']['fields'] == \
[dict(name='y', type='string', format='default')]
f = Flow(
data,
data2,
delete_fields(['x+'])
)
results, dp, _ = f.results()
for res in results:
for i in res:
assert list(i.keys()) == ['y']
assert dp.descriptor['resources'][0]['schema']['fields'] == \
[dict(name='y', type='string', format='default')]
def test_load_from_package_resource_matching():
from dataflows import dump_to_path, load
Flow(
[{'foo': 'bar'}],
[{'foo': 'baz'}],
dump_to_path('out/load_from_package_resource_matching(')
).process()
ds = Flow(
load('out/load_from_package_resource_matching(/datapackage.json', resources=['res_2'])
).datastream()
assert len(ds.dp.resources) == 1
assert [list(res) for res in ds.res_iter] == [[{'foo': 'baz'}]]
def test_add_computed_field_func():
from dataflows import add_computed_field
data = [
dict(x=i) for i in range(3)
]
f = Flow(
data,
add_computed_field([
dict(target=dict(name='sq', type='integer'),
operation=lambda row: row['x'] ** 2),
dict(target='f', operation='format', with_='{x} - {x}')
])
)
results, *_ = f.results()
results = list(results[0])
assert results == [
dict(x=0, sq=0, f='0 - 0'),
dict(x=1, sq=1, f='1 - 1'),
dict(x=2, sq=4, f='2 - 2'),
]
def test_load_xml():
from dataflows import Flow, load
results, dp, stats = Flow(load('data/sample.xml')).results()
assert results[0] == [
{'publication-year': 1954, 'title': 'The Fellowship of the Ring'},
{'publication-year': 1954, 'title': 'The Two Towers'},
{'publication-year': 1955, 'title': 'The Return of the King'}
]
def flow(parameters, datapackage, resources, stats):
stats['foo_values'] = 0
def add_foo_field(package: PackageWrapper):
package.pkg.descriptor['resources'][0]['schema']['fields'] += [
{'name': parameters['attr'], 'type': 'string'}]
yield package.pkg
yield from package
def add_foo_value(row):
row[parameters['attr']] = 'foo'
stats['foo_values'] += 1
return Flow(add_metadata(name='_'),
hello_dataflows,
add_foo_field,
add_foo_value)
def test_sort_rows():
from dataflows import sort_rows
f = Flow(
[
{'a': 1, 'b': 3},
{'a': 2, 'b': 3},
{'a': 3, 'b': 1},
{'a': 4, 'b': 1},
],
sort_rows(key='{b}{a}'),
)
results, _, _ = f.results()
assert list(results[0]) == [
{'a': 3, 'b': 1},
{'a': 4, 'b': 1},
{'a': 1, 'b': 3},
{'a': 2, 'b': 3},
]
def test_example_2():
from dataflows import Flow, load
def titleName(row):
row['name'] = row['name'].title()
f = Flow(
load('data/beatles.csv'),
titleName
)
data, *_ = f.results()
def main(package_url):
jenkins_user_token = ckan_manager.get_jenkins_token('ckan-cloud-operator-jenkins-creds')
package_url = package_url.replace('https://', 'https://{}:{}@'.format(*jenkins_user_token))
stats_rows = []
Flow(
load(package_url),
aggregate_stats(stats_rows),
dump_to_path('data/aggregate_access_logs')
).process()
Flow(
(row for row in stats_rows),
dump_to_path('data/aggregate_access_logs_stats'),
printer()
).process()
def main(package_url):
jenkins_user_token = ckan_manager.get_jenkins_token('ckan-cloud-operator-jenkins-creds')
package_url = package_url.replace('https://', 'https://{}:{}@'.format(*jenkins_user_token))
stats_rows = []
Flow(
load(package_url),
aggregate_stats(stats_rows),
dump_to_path('data/aggregate_access_logs')
).process()
Flow(
(row for row in stats_rows),
dump_to_path('data/aggregate_access_logs_stats'),
printer()
).process()
from .stream import stream
from .unstream import unstream
def _notify_checkpoint_saved(checkpoint_name):
def step(package):
yield package.pkg
for rows in package:
yield (row for row in rows)
print(f"checkpoint saved: {checkpoint_name}")
return step
class checkpoint(Flow):
def __init__(self, checkpoint_name, checkpoint_path='.checkpoints', steps=None, resources=None):
if not steps:
steps = []
super().__init__(*steps)
self.checkpoint_name = checkpoint_name
self.checkpoint_path = os.path.join(checkpoint_path, checkpoint_name)
self.resources = resources
@property
def filename(self):
return os.path.join(self.checkpoint_path, 'stream.ndjson')
def exists(self):
return os.path.exists(self.filename)