Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_join_row_number_format_string():
from dataflows import load, set_type, join
flow = Flow(
load('data/population.csv'),
load('data/cities_comment.csv'),
join(
source_name='population',
source_key='city with population in row {#}',
target_name='cities_comment',
target_key='{comment}',
fields={'population': {'name': 'population'}}
),
)
data = flow.results()[0]
assert data == [[
{'city': 'paris', 'population': 2, 'comment': 'city with population in row 2'},
{'city': 'london', 'population': 8, 'comment': 'city with population in row 1'},
{'city': 'rome', 'population': 3, 'comment': 'city with population in row 3'},
]]
def test_join_row_number_readme_example():
from dataflows import load, set_type, join
flow = Flow(
load('data/values.csv'),
load('data/names.csv'),
join(
source_name='values',
source_key=['#'],
target_name='names',
target_key=['#'],
fields={'values': {'name': 'values'}}
),
)
data = flow.results()[0]
assert data == [[
{'id': 1, 'names': 'name1', 'values': 'value1'},
{'id': 2, 'names': 'name2', 'values': 'value2'},
]]
def test_example_9():
from dataflows import Flow, load, dump_to_path, join, concatenate, filter_rows
f = Flow(
# Emmy award nominees and winners
load('data/emmy.csv', name='emmies'),
filter_rows(equals=[dict(winner=1)]),
concatenate(dict(
emmy_nominee=['nominee'],
),
dict(name='emmies_filtered'),
resources='emmies'),
# Academy award nominees and winners
load('data/academy.csv', encoding='utf8', name='oscars'),
join('emmies_filtered', ['emmy_nominee'], # Source resource
'oscars', ['Name'], # Target resource
full=False # Don't add new fields, remove unmatched rows
),
filter_rows(equals=[dict(Winner='1')]),
dump_to_path('out/double_winners')
)
_ = f.process()
]
houses = [
{'house': 'House of Lannister'},
{'house': 'House of Greyjoy'},
{'house': 'House of Stark'},
{'house': 'House of Targaryen'},
{'house': 'House of Martell'},
{'house': 'House of Tyrell'},
]
res, _, _ = Flow(
characters,
set_type('age', type='number'),
houses,
join(
'res_1',
'House of {house}',
'res_2',
'{house}',
dict(
max_age={
'name': 'age',
'aggregate': 'max'
},
avg_age={
'name': 'age',
'aggregate': 'avg'
},
representative={
'name': 'first_name',
'aggregate': 'last'
def test_join_full_outer():
from dataflows import load, set_type, join
flow = Flow(
load('data/population.csv'),
load('data/cities.csv'),
join(
source_name='population',
source_key=['id'],
target_name='cities',
target_key=['id'],
fields={'population': {'name': 'population'}},
mode='full-outer',
),
)
data = flow.results()[0]
assert data == [[
{'id': 1, 'city': 'london', 'population': 8},
{'id': 2, 'city': 'paris', 'population': 2},
{'id': 3, 'city': 'rome', 'population': None},
{'id': 4, 'city': None, 'population': 3},
]]
def test_join_preserve_source_fields_order():
from dataflows import load, join
flow = Flow(
load('data/cities_metadata.csv'),
load('data/cities.csv'),
join(
source_name='cities_metadata',
source_key='{id}',
target_name='cities',
target_key='{id}',
fields={'key1': {'name': 'key1'}, 'key2': {'name': 'key2'}}
),
)
data, package, stats = flow.results()
assert package.descriptor['resources'][0]['schema']['fields'] == [
{'name': 'id', 'type': 'integer', 'format': 'default'},
{'name': 'city', 'type': 'string', 'format': 'default'},
{'name': 'key2', 'type': 'string', 'format': 'default'},
{'name': 'key1', 'type': 'string', 'format': 'default'}
]
assert data == [[
{'id': 1, 'city': 'london', 'key1': 'val1', 'key2': 'val2'},
def test_join_row_number():
from dataflows import load, set_type, join
flow = Flow(
load('data/population.csv'),
load('data/cities.csv'),
join(
source_name='population',
source_key=['#'],
target_name='cities',
target_key=['#'],
fields={'population': {'name': 'population'}}
),
)
data = flow.results()[0]
assert data == [[
{'id': 1, 'city': 'london', 'population': 8},
{'id': 2, 'city': 'paris', 'population': 2},
{'id': 3, 'city': 'rome', 'population': 3},
]]
def flow(parameters):
source = parameters['source']
target = parameters['target']
return Flow(
load_lazy_json(source['name']),
join(
source['name'],
source['key'],
target['name'],
target['key'],
parameters['fields'],
parameters.get('full', True),
source.get('delete', False)
),
update_resource(
target['name'],
**{
PROP_STREAMING: True
}