Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dict(x=3, y='c', xx=True),
]
f = Flow(
data,
delete_fields(['x'])
)
results, dp, _ = f.results()
for i in results[0]:
assert list(i.keys()) == ['y']
assert dp.descriptor['resources'][0]['schema']['fields'] == \
[dict(name='y', type='string', format='default')]
f = Flow(
data,
data2,
delete_fields(['x+'])
)
results, dp, _ = f.results()
for res in results:
for i in res:
assert list(i.keys()) == ['y']
assert dp.descriptor['resources'][0]['schema']['fields'] == \
[dict(name='y', type='string', format='default')]
def test_delete_field():
from dataflows import delete_fields
data2 = [
dict(x=1, y='a', xx=True),
dict(x=2, y='b', xx=True),
dict(x=3, y='c', xx=True),
]
f = Flow(
data,
delete_fields(['x'])
)
results, dp, _ = f.results()
for i in results[0]:
assert list(i.keys()) == ['y']
assert dp.descriptor['resources'][0]['schema']['fields'] == \
[dict(name='y', type='string', format='default')]
f = Flow(
data,
data2,
delete_fields(['x+'])
)
results, dp, _ = f.results()
for res in results:
for i in res:
assert list(i.keys()) == ['y']
def test_delete_fields_regex():
from dataflows import load, delete_fields
flow = Flow(
load('data/regex.csv'),
delete_fields(['temperature (24h)'], regex=False),
)
data = flow.results()[0]
assert data == [[
{'city': 'london'},
{'city': 'paris'},
{'city': 'rome'},
]]
def flow(parameters):
resources = parameters.get('resources')
regex = parameters.get('regex', True)
return Flow(
delete_fields(
parameters.get('fields', []),
resources=resources,
regex=regex,
)
],
sources= [
{
"name": "Our Airports",
"path": "http://ourairports.com/data/",
"title": "Our Airports"
}
],
readme=readme()
),
add_computed_field([{
"operation": "format",
"target": "coordinates",
"with": "{latitude_deg}, {longitude_deg}"
}]),
delete_fields(fields=[
"id","longitude_deg","latitude_deg",
"scheduled_service","home_link","wikipedia_link","keywords"
]),
update_resource('airport-codes', **{'path':'data/airport-codes.csv'}),
validate(),
dump_to_path()
)
def flow(parameters, datapackage, resources, stats):
return dialing_info_cldr
if __name__ == '__main__':
dialing_info_cldr.process()
def flow(parameters):
resources = parameters.get('resources')
regex = parameters.get('regex', True)
if 'types' in parameters:
return Flow(
*[
set_type(name, resources=resources, regex=regex, **options)
if options is not None else
delete_fields([name], resources=resources)
for name, options in parameters['types'].items()
]
)
else:
return Flow(
validate()
)