Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
data2 = [
dict(x=1, y='a', xx=True),
dict(x=2, y='b', xx=True),
dict(x=3, y='c', xx=True),
]
f = Flow(
data,
delete_fields(['x'])
)
results, dp, _ = f.results()
for i in results[0]:
assert list(i.keys()) == ['y']
assert dp.descriptor['resources'][0]['schema']['fields'] == \
[dict(name='y', type='string', format='default')]
f = Flow(
data,
data2,
delete_fields(['x+'])
)
results, dp, _ = f.results()
for res in results:
for i in res:
assert list(i.keys()) == ['y']
assert dp.descriptor['resources'][0]['schema']['fields'] == \
[dict(name='y', type='string', format='default')]
def test_load_from_package_resource_matching():
from dataflows import dump_to_path, load
Flow(
[{'foo': 'bar'}],
[{'foo': 'baz'}],
dump_to_path('out/load_from_package_resource_matching(')
).process()
ds = Flow(
load('out/load_from_package_resource_matching(/datapackage.json', resources=['res_2'])
).datastream()
assert len(ds.dp.resources) == 1
assert [list(res) for res in ds.res_iter] == [[{'foo': 'baz'}]]
def test_add_computed_field_func():
from dataflows import add_computed_field
data = [
dict(x=i) for i in range(3)
]
f = Flow(
data,
add_computed_field([
dict(target=dict(name='sq', type='integer'),
operation=lambda row: row['x'] ** 2),
dict(target='f', operation='format', with_='{x} - {x}')
])
)
results, *_ = f.results()
results = list(results[0])
assert results == [
dict(x=0, sq=0, f='0 - 0'),
dict(x=1, sq=1, f='1 - 1'),
dict(x=2, sq=4, f='2 - 2'),
]
def test_load_xml():
from dataflows import Flow, load
results, dp, stats = Flow(load('data/sample.xml')).results()
assert results[0] == [
{'publication-year': 1954, 'title': 'The Fellowship of the Ring'},
{'publication-year': 1954, 'title': 'The Two Towers'},
{'publication-year': 1955, 'title': 'The Return of the King'}
]
def flow(parameters, datapackage, resources, stats):
stats['foo_values'] = 0
def add_foo_field(package: PackageWrapper):
package.pkg.descriptor['resources'][0]['schema']['fields'] += [
{'name': parameters['attr'], 'type': 'string'}]
yield package.pkg
yield from package
def add_foo_value(row):
row[parameters['attr']] = 'foo'
stats['foo_values'] += 1
return Flow(add_metadata(name='_'),
hello_dataflows,
add_foo_field,
add_foo_value)
def test_sort_rows():
from dataflows import sort_rows
f = Flow(
[
{'a': 1, 'b': 3},
{'a': 2, 'b': 3},
{'a': 3, 'b': 1},
{'a': 4, 'b': 1},
],
sort_rows(key='{b}{a}'),
)
results, _, _ = f.results()
assert list(results[0]) == [
{'a': 3, 'b': 1},
{'a': 4, 'b': 1},
{'a': 1, 'b': 3},
{'a': 2, 'b': 3},
]
def test_example_2():
from dataflows import Flow, load
def titleName(row):
row['name'] = row['name'].title()
f = Flow(
load('data/beatles.csv'),
titleName
)
data, *_ = f.results()
def test_load_from_env_var():
import os
from dataflows import load, dump_to_path
Flow(
[{'foo': 'bar'}],
dump_to_path('out/load_from_env_var')
).process()
os.environ['MY_DATAPACKAGE'] = 'out/load_from_env_var/datapackage.json'
results, dp, _ = Flow(
load('env://MY_DATAPACKAGE')
).results()
assert len(dp.resources) == 1
assert results == [[{'foo': 'bar'}]]
def test_example_9():
from dataflows import Flow, load, dump_to_path, join, concatenate, filter_rows
f = Flow(
# Emmy award nominees and winners
load('data/emmy.csv', name='emmies'),
filter_rows(equals=[dict(winner=1)]),
concatenate(dict(
emmy_nominee=['nominee'],
),
dict(name='emmies_filtered'),
resources='emmies'),
# Academy award nominees and winners
load('data/academy.csv', encoding='utf8', name='oscars'),
join('emmies_filtered', ['emmy_nominee'], # Source resource
'oscars', ['Name'], # Target resource
full=False # Don't add new fields, remove unmatched rows
),
filter_rows(equals=[dict(Winner='1')]),
dump_to_path('out/double_winners')
)
_ = f.process()
def test_exception_information_multiple_processors_function_error():
from dataflows import Flow, load, exceptions
def func(rows):
for i, row in enumerate(rows):
if i == 1:
raise MyException('custom-error')
yield row
flow = Flow(
load('data/academy.csv'),
func
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
flow.results()
assert str(excinfo.value.cause) == 'custom-error'
assert excinfo.value.processor_name == 'rows_processor'
assert excinfo.value.processor_position == 2