Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_load_from_env_var():
import os
from dataflows import load, dump_to_path
Flow(
[{'foo': 'bar'}],
dump_to_path('out/load_from_env_var')
).process()
os.environ['MY_DATAPACKAGE'] = 'out/load_from_env_var/datapackage.json'
results, dp, _ = Flow(
load('env://MY_DATAPACKAGE')
).results()
assert len(dp.resources) == 1
assert results == [[{'foo': 'bar'}]]
def test_example_9():
from dataflows import Flow, load, dump_to_path, join, concatenate, filter_rows
f = Flow(
# Emmy award nominees and winners
load('data/emmy.csv', name='emmies'),
filter_rows(equals=[dict(winner=1)]),
concatenate(dict(
emmy_nominee=['nominee'],
),
dict(name='emmies_filtered'),
resources='emmies'),
# Academy award nominees and winners
load('data/academy.csv', encoding='utf8', name='oscars'),
join('emmies_filtered', ['emmy_nominee'], # Source resource
'oscars', ['Name'], # Target resource
full=False # Don't add new fields, remove unmatched rows
),
filter_rows(equals=[dict(Winner='1')]),
dump_to_path('out/double_winners')
)
_ = f.process()
def test_exception_information_multiple_processors_function_error():
from dataflows import Flow, load, exceptions
def func(rows):
for i, row in enumerate(rows):
if i == 1:
raise MyException('custom-error')
yield row
flow = Flow(
load('data/academy.csv'),
func
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
flow.results()
assert str(excinfo.value.cause) == 'custom-error'
assert excinfo.value.processor_name == 'rows_processor'
assert excinfo.value.processor_position == 2
def test_set_type_regex():
from dataflows import load, set_type
flow = Flow(
load('data/regex.csv'),
set_type('city', type='string'),
set_type('temperature (24h)', type='integer', regex=False),
)
data = flow.results()[0]
assert data == [[
{'city': 'london', 'temperature (24h)': 23},
{'city': 'paris', 'temperature (24h)': 26},
{'city': 'rome', 'temperature (24h)': 21},
]]
def ensure_type(t):
def func(row):
assert isinstance(row['a'], t)
return func
results, dp, stats = Flow(load('data/empty_headers.csv'),
ensure_type(str)).results()
assert results[0] == [
{'a': 1, 'b': 2},
{'a': 2, 'b': 3},
{'a': 3, 'b': 4},
{'a': 5, 'b': 6}
]
assert len(dp.resources[0].schema.fields) == 2
results, dp, stats = Flow(load('data/empty_headers.csv', validate=True),
ensure_type(int)).results()
assert results[0] == [
{'a': 1, 'b': 2},
{'a': 2, 'b': 3},
{'a': 3, 'b': 4},
{'a': 5, 'b': 6}
]
results, dp, stats = Flow(load('data/empty_headers.csv', force_strings=True),
ensure_type(str)).results()
assert results[0] == [
{'a': '1', 'b': '2'},
{'a': '2', 'b': '3'},
{'a': '3', 'b': '4'},
{'a': '5', 'b': '6'}
]
def test_exception_information_multiple_processors_last_errored():
from dataflows import Flow, load, exceptions
flow = Flow(
load('data/academy.csv'),
load('data/bad-path2.csv'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
flow.results()
assert str(excinfo.value.cause) == "Failed to load source 'data/bad-path2.csv' and options {'custom_parsers': {'xml': }, 'ignore_blank_headers': True, 'headers': 1}: [Errno 2] No such file or directory: 'data/bad-path2.csv'"
assert excinfo.value.processor_name == 'load'
assert excinfo.value.processor_object.load_source == 'data/bad-path2.csv'
assert excinfo.value.processor_position == 2
emmy_winners = set(
map(lambda x: x['nominee'],
filter(lambda x: x['winner'],
emmy))
)
# Oscars are next - filter rows based on the emmy winner set
academy = next(resources)
yield filter(lambda row: row['Winner'] and row['Name'] in emmy_winners,
academy)
f = Flow(
# Emmy award nominees and winners
load('data/emmy.csv', name='emmies'),
# Academy award nominees and winners
load('data/academy.csv', encoding='utf8', name='oscars'),
find_double_winners,
dump_to_path('out/double_winners')
)
_ = f.process()
def test_join_row_number():
from dataflows import load, set_type, join
flow = Flow(
load('data/population.csv'),
load('data/cities.csv'),
join(
source_name='population',
source_key=['#'],
target_name='cities',
target_key=['#'],
fields={'population': {'name': 'population'}}
),
)
data = flow.results()[0]
assert data == [[
{'id': 1, 'city': 'london', 'population': 8},
{'id': 2, 'city': 'paris', 'population': 2},
{'id': 3, 'city': 'rome', 'population': 3},
]]
def test_sort_rows_decimal():
from decimal import Decimal
from dataflows import sort_rows, load
f = Flow(
load('data/numbers.csv', cast_strategy=load.CAST_WITH_SCHEMA),
sort_rows(key='{a}'),
)
results, dp, _ = f.results()
assert list(results[0]) == [
{'a': Decimal('-1000')},
{'a': Decimal('-0.5')},
{'a': Decimal('-0.4')},
{'a': Decimal('0')},
{'a': Decimal('1.1')},
{'a': Decimal('2')},
{'a': Decimal('10')},
{'a': Decimal('1000')}
]
def spew_flow(flow, ctx: ProcessorContext):
flow = Flow(
update_package(**ctx.datapackage),
load((ctx.datapackage, ctx.resource_iterator)),
flow,
)
datastream = flow.datastream()
ctx.datapackage = datastream.dp.descriptor
ctx.resource_iterator = datastream.res_iter
ctx.stats = MergeableStats(datastream.stats, ctx.stats)