Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class on_error_fields():
def __init__(self):
self.bad_row, self.bad_index, self.bad_field = None, None, None
def __call__(self, name, row, i, e, field):
self.bad_row, self.bad_index = row, i
self.bad_field = field
return False
# Schema validator
handler = on_error()
res, *_ = Flow(
data,
set_type('b', type='integer', on_error=ignore),
validate(on_error=handler)
).results()
assert len(res[0]) == 3
assert handler.bad_row == {'a': 4, 'b': 'a'}
assert handler.bad_index == 3
# Schema validator with fields
handler = on_error_fields()
res, *_ = Flow(
data,
set_type('b', type='integer', on_error=ignore),
validate(on_error=handler)
).results()
assert len(res[0]) == 3
assert handler.bad_row == {'a': 4, 'b': 'a'}
assert handler.bad_index == 3
assert handler.bad_field.name == 'b'
handler = on_error()
res, *_ = Flow(
data,
set_type('b', type='integer', on_error=ignore),
validate('a', lambda v: v < 4, on_error=handler)
).results()
assert len(res[0]) == 3
assert handler.bad_row == {'a': 4, 'b': 'a'}
assert handler.bad_index == 3
# Row validator
handler = on_error()
res, *_ = Flow(
data,
set_type('b', type='integer', on_error=ignore),
validate(lambda v: v['a'] < 4, on_error=handler)
).results()
assert len(res[0]) == 3
assert handler.bad_row == {'a': 4, 'b': 'a'}
assert handler.bad_index == 3
def test_update_schema():
from dataflows import Flow, printer, update_schema, validate
f = Flow(
[['a', '-'], ['a', 0]],
update_schema(-1, missingValues=['-']),
validate(),
printer()
)
results, dp, stats = f.results()
print(dp.descriptor)
assert results[0] == [
dict(col0='a', col1=None),
dict(col0='a', col1=0),
]
def test_set_type_resources():
from dataflows import Flow, set_type, validate
f = Flow(
[dict(a=str(i)) for i in range(10)],
[dict(b=str(i)) for i in range(10)],
[dict(c='0_' + str(i)) for i in range(10)],
set_type('a', resources='res_[1]', type='integer'),
set_type('b', resources=['res_2'], type='integer'),
set_type('[cd]', resources=-1, type='number', groupChar='_'),
validate()
)
results, dp, stats = f.results()
print(dp.descriptor)
assert results[0][1]['a'] == 1
assert results[1][3]['b'] == 3
assert results[2][8]['c'] == 8.0
res, *_ = Flow(
data,
set_type('b', type='integer', on_error=ignore),
validate(on_error=handler)
).results()
assert len(res[0]) == 3
assert handler.bad_row == {'a': 4, 'b': 'a'}
assert handler.bad_index == 3
assert handler.bad_field.name == 'b'
# Field validator
handler = on_error()
res, *_ = Flow(
data,
set_type('b', type='integer', on_error=ignore),
validate('a', lambda v: v < 4, on_error=handler)
).results()
assert len(res[0]) == 3
assert handler.bad_row == {'a': 4, 'b': 'a'}
assert handler.bad_index == 3
# Row validator
handler = on_error()
res, *_ = Flow(
data,
set_type('b', type='integer', on_error=ignore),
validate(lambda v: v['a'] < 4, on_error=handler)
).results()
assert len(res[0]) == 3
assert handler.bad_row == {'a': 4, 'b': 'a'}
assert handler.bad_index == 3
def test_validate():
from dataflows import Flow, validate, set_type, printer, ValidationError, exceptions
def adder(row):
row['a'] += 0.5
row['a'] = str(row['a'])
f = Flow(
(dict(a=x) for x in range(10)),
set_type('a', type='integer'),
adder,
validate(),
printer()
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
f.process()
assert isinstance(excinfo.value.cause, ValidationError)
"name": "graph",
"title": "VIX - CBOE Volatility Index",
"specType": "simple",
"spec": {"type": "line","group": "Date","series": ["VIX Close"]}
}
],
readme=readme()
),
load(
load_source='http://www.cboe.com/publish/ScheduledTask/MktData/datahouse/vixcurrent.csv',
headers=2,
name='vix-daily'
),
set_type('Date', type='date', format='any'),
update_resource('vix-daily', **{'title': 'VIX Daily', 'path':'data/vix-daily.csv', 'dpp:streaming': True}),
validate()
)
def flow(parameters, datapackage, resources, stats):
return finance_vix
if __name__ == '__main__':
finance_vix.process()
name='daily'
),
load(
load_source='http://www.eia.gov/dnav/ng/hist_xls/RNGWHHDm.xls',
format='xls',
sheet=2,
skip_rows=[1,2,3,-1],
headers=['Month', 'Price'],
name='monthly'
),
format_date,
set_type('Date', resources='daily', type='date'),
set_type('Month',resources='monthly', type='yearmonth'),
update_resource('daily', **{'path':'data/daily.csv', 'dpp:streaming': True}),
update_resource('monthly', **{'path':'data/monthly.csv', 'dpp:streaming': True}),
validate()
)
def flow(parameters, datapackage, resources, stats):
return natural_gas
if __name__ == '__main__':
natural_gas.process()
"title": "Our Airports"
}
],
readme=readme()
),
add_computed_field([{
"operation": "format",
"target": "coordinates",
"with": "{latitude_deg}, {longitude_deg}"
}]),
delete_fields(fields=[
"id","longitude_deg","latitude_deg",
"scheduled_service","home_link","wikipedia_link","keywords"
]),
update_resource('airport-codes', **{'path':'data/airport-codes.csv'}),
validate(),
dump_to_path()
)
def flow(parameters, datapackage, resources, stats):
return dialing_info_cldr
if __name__ == '__main__':
dialing_info_cldr.process()
def flow(parameters):
resources = parameters.get('resources')
regex = parameters.get('regex', True)
if 'types' in parameters:
return Flow(
*[
set_type(name, resources=resources, regex=regex, **options)
if options is not None else
delete_fields([name], resources=resources)
for name, options in parameters['types'].items()
]
)
else:
return Flow(
validate()
)