Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_example_4():
from dataflows import Flow, set_type
f = Flow(
country_population(),
set_type('population', type='number', groupChar=',')
)
data, dp, _ = f.results()
def test_dump_to_path_use_titles():
from dataflows import Flow, dump_to_path, set_type
import tabulator
Flow(
[{'hello': 'world', 'hola': 'mundo'}, {'hello': 'עולם', 'hola': 'عالم'}],
*(set_type(name, resources=['res_1'], title=title) for name, title
in (('hello', 'שלום'), ('hola', 'aloha'))),
dump_to_path('out/dump_with_titles', use_titles=True)
).process()
with tabulator.Stream('out/dump_with_titles/res_1.csv') as stream:
assert stream.read() == [['שלום', 'aloha'],
['world', 'mundo'],
['עולם', 'عالم']]
def test_set_type_regex():
from dataflows import load, set_type
flow = Flow(
load('data/regex.csv'),
set_type('city', type='string'),
set_type('temperature (24h)', type='integer', regex=False),
)
data = flow.results()[0]
assert data == [[
{'city': 'london', 'temperature (24h)': 23},
{'city': 'paris', 'temperature (24h)': 26},
{'city': 'rome', 'temperature (24h)': 21},
]]
def test_set_type_resources():
from dataflows import Flow, set_type, validate
f = Flow(
[dict(a=str(i)) for i in range(10)],
[dict(b=str(i)) for i in range(10)],
[dict(c='0_' + str(i)) for i in range(10)],
set_type('a', resources='res_[1]', type='integer'),
set_type('b', resources=['res_2'], type='integer'),
set_type('[cd]', resources=-1, type='number', groupChar='_'),
validate()
)
results, dp, stats = f.results()
print(dp.descriptor)
assert results[0][1]['a'] == 1
assert results[1][3]['b'] == 3
assert results[2][8]['c'] == 8.0
def test_save_load_dates():
from dataflows import Flow, dump_to_path, load, set_type, printer
import datetime
Flow(
[{'id': 1, 'ts': datetime.datetime.now()},
{'id': 2, 'ts': datetime.datetime.now()}],
set_type('ts', type='datetime', format='%Y-%m-%d/%H:%M:%S'),
dump_to_path('out/test_save_load_dates')
).process()
res, _, _ = Flow(
load('out/test_save_load_dates/datapackage.json'),
printer()
).results()
def run_flow(datetime_format=None):
Flow(
[{'today': str(_today), 'now': str(_now)}],
set_type('today', type='date'),
set_type('now', type='datetime', format=datetime_format),
dump_to_path('out/dump_dates')
).process()
handler = on_error_fields()
res, *_ = Flow(
data,
set_type('b', type='integer', on_error=ignore),
validate(on_error=handler)
).results()
assert len(res[0]) == 3
assert handler.bad_row == {'a': 4, 'b': 'a'}
assert handler.bad_index == 3
assert handler.bad_field.name == 'b'
# Field validator
handler = on_error()
res, *_ = Flow(
data,
set_type('b', type='integer', on_error=ignore),
validate('a', lambda v: v < 4, on_error=handler)
).results()
assert len(res[0]) == 3
assert handler.bad_row == {'a': 4, 'b': 'a'}
assert handler.bad_index == 3
# Row validator
handler = on_error()
res, *_ = Flow(
data,
set_type('b', type='integer', on_error=ignore),
validate(lambda v: v['a'] < 4, on_error=handler)
).results()
assert len(res[0]) == 3
assert handler.bad_row == {'a': 4, 'b': 'a'}
assert handler.bad_index == 3
def run_flow(datetime_format=None):
Flow(
[{'today': str(_today), 'now': str(_now)}],
set_type('today', type='date'),
set_type('now', type='datetime', format=datetime_format),
dump_to_path('out/dump_dates')
).process()
def test_example_5():
from dataflows import Flow, set_type, dump_to_path
f = Flow(
country_population(),
set_type('population', type='number', groupChar=','),
dump_to_path('out/country_population')
)
_ = f.process()
format='xls',
sheet=2,
skip_rows=[1,2,3,-1],
headers=['Date', 'Price'],
name='daily'
),
load(
load_source='http://www.eia.gov/dnav/ng/hist_xls/RNGWHHDm.xls',
format='xls',
sheet=2,
skip_rows=[1,2,3,-1],
headers=['Month', 'Price'],
name='monthly'
),
format_date,
set_type('Date', resources='daily', type='date'),
set_type('Month',resources='monthly', type='yearmonth'),
update_resource('daily', **{'path':'data/daily.csv', 'dpp:streaming': True}),
update_resource('monthly', **{'path':'data/monthly.csv', 'dpp:streaming': True}),
validate()
)
def flow(parameters, datapackage, resources, stats):
return natural_gas
if __name__ == '__main__':
natural_gas.process()