How to use the dataflows.Flow function in dataflows

To help you get started, we’ve selected a few dataflows examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github datahq / dataflows / tests / test_lib.py View on Github external
data2 = [
        dict(x=1, y='a', xx=True),
        dict(x=2, y='b', xx=True),
        dict(x=3, y='c', xx=True),
    ]
    f = Flow(
        data,
        delete_fields(['x'])
    )
    results, dp, _ = f.results()
    for i in results[0]:
        assert list(i.keys()) == ['y']
    assert dp.descriptor['resources'][0]['schema']['fields'] == \
        [dict(name='y', type='string', format='default')]

    f = Flow(
        data,
        data2,
        delete_fields(['x+'])
    )
    results, dp, _ = f.results()
    for res in results:
        for i in res:
            assert list(i.keys()) == ['y']
    assert dp.descriptor['resources'][0]['schema']['fields'] == \
        [dict(name='y', type='string', format='default')]
github datahq / dataflows / tests / test_lib.py View on Github external
def test_load_from_package_resource_matching():
    from dataflows import dump_to_path, load

    Flow(
        [{'foo': 'bar'}],
        [{'foo': 'baz'}],
        dump_to_path('out/load_from_package_resource_matching(')
    ).process()

    ds = Flow(
        load('out/load_from_package_resource_matching(/datapackage.json', resources=['res_2'])
    ).datastream()

    assert len(ds.dp.resources) == 1
    assert [list(res) for res in ds.res_iter] == [[{'foo': 'baz'}]]
github datahq / dataflows / tests / test_lib.py View on Github external
def test_add_computed_field_func():
    from dataflows import add_computed_field

    data = [
        dict(x=i) for i in range(3)
    ]

    f = Flow(
        data,
        add_computed_field([
            dict(target=dict(name='sq', type='integer'),
                 operation=lambda row: row['x'] ** 2),
            dict(target='f', operation='format', with_='{x} - {x}')
        ])
    )
    results, *_ = f.results()
    results = list(results[0])

    assert results == [
        dict(x=0, sq=0, f='0 - 0'),
        dict(x=1, sq=1, f='1 - 1'),
        dict(x=2, sq=4, f='2 - 2'),
    ]
github datahq / dataflows / tests / test_lib.py View on Github external
def test_load_xml():
    from dataflows import Flow, load

    results, dp, stats = Flow(load('data/sample.xml')).results()

    assert results[0] == [
        {'publication-year': 1954, 'title': 'The Fellowship of the Ring'},
        {'publication-year': 1954, 'title': 'The Two Towers'},
        {'publication-year': 1955, 'title': 'The Return of the King'}
    ]
github frictionlessdata / datapackage-pipelines / tests / cli / test_flow.py View on Github external
def flow(parameters, datapackage, resources, stats):
    stats['foo_values'] = 0

    def add_foo_field(package: PackageWrapper):
        package.pkg.descriptor['resources'][0]['schema']['fields'] += [
            {'name': parameters['attr'], 'type': 'string'}]
        yield package.pkg
        yield from package

    def add_foo_value(row):
        row[parameters['attr']] = 'foo'
        stats['foo_values'] += 1

    return Flow(add_metadata(name='_'),
                hello_dataflows,
                add_foo_field,
                add_foo_value)
github datahq / dataflows / tests / test_lib.py View on Github external
def test_sort_rows():
    from dataflows import sort_rows

    f = Flow(
        [
            {'a': 1, 'b': 3},
            {'a': 2, 'b': 3},
            {'a': 3, 'b': 1},
            {'a': 4, 'b': 1},
        ],
        sort_rows(key='{b}{a}'),
    )
    results, _, _ = f.results()
    assert list(results[0]) == [
        {'a': 3, 'b': 1},
        {'a': 4, 'b': 1},
        {'a': 1, 'b': 3},
        {'a': 2, 'b': 3},
    ]
github datahq / dataflows / tests / test_examples.py View on Github external
def test_example_2():
    from dataflows import Flow, load

    def titleName(row):
        row['name'] = row['name'].title()

    f = Flow(
        load('data/beatles.csv'),
        titleName
    )
    data, *_ = f.results()
github datopian / ckan-cloud-operator / scripts / aggregate access logs.py View on Github external
def main(package_url):
    jenkins_user_token = ckan_manager.get_jenkins_token('ckan-cloud-operator-jenkins-creds')
    package_url = package_url.replace('https://', 'https://{}:{}@'.format(*jenkins_user_token))
    stats_rows = []
    Flow(
        load(package_url),
        aggregate_stats(stats_rows),
        dump_to_path('data/aggregate_access_logs')
    ).process()
    Flow(
        (row for row in stats_rows),
        dump_to_path('data/aggregate_access_logs_stats'),
        printer()
    ).process()
github datopian / ckan-cloud-operator / scripts / aggregate access logs.py View on Github external
def main(package_url):
    jenkins_user_token = ckan_manager.get_jenkins_token('ckan-cloud-operator-jenkins-creds')
    package_url = package_url.replace('https://', 'https://{}:{}@'.format(*jenkins_user_token))
    stats_rows = []
    Flow(
        load(package_url),
        aggregate_stats(stats_rows),
        dump_to_path('data/aggregate_access_logs')
    ).process()
    Flow(
        (row for row in stats_rows),
        dump_to_path('data/aggregate_access_logs_stats'),
        printer()
    ).process()
github datahq / dataflows / dataflows / processors / checkpoint.py View on Github external
from .stream import stream
from .unstream import unstream


def _notify_checkpoint_saved(checkpoint_name):

    def step(package):
        yield package.pkg
        for rows in package:
            yield (row for row in rows)
        print(f"checkpoint saved: {checkpoint_name}")

    return step


class checkpoint(Flow):

    def __init__(self, checkpoint_name, checkpoint_path='.checkpoints', steps=None, resources=None):
        if not steps:
            steps = []
        super().__init__(*steps)
        self.checkpoint_name = checkpoint_name
        self.checkpoint_path = os.path.join(checkpoint_path, checkpoint_name)
        self.resources = resources

    @property
    def filename(self):
        return os.path.join(self.checkpoint_path, 'stream.ndjson')

    def exists(self):
        return os.path.exists(self.filename)