How to use the disdat.pipe.PipeTask function in disdat

To help you get started, we’ve selected a few disdat examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github kyocum / disdat / tests / pipelines / test_inc_pull.py View on Github external
for i in range(3):
            with b.add_file('output_{}'.format(i)).open('w') as of:
                of.write("some text for the {} file".format(i))

    b.commit().push()

    b.rm()

    b.pull(localize=False)

    api.apply(TEST_CONTEXT, 'test_output', 'ConsumeExtDep', incremental_pull=True)

    api.delete_context(TEST_CONTEXT, remote=True)


class ConsumeExtDep(PipeTask):
    """ Consume files from an external dependency
    """

    def pipe_requires(self):
        """ We depend on a manually created bundle that
        is parameterized by its name and its owner
        """
        self.add_external_dependency("input_files",
                                     api.BundleWrapperTask,
                                     {'name': TEST_NAME,
                                      'owner': getpass.getuser()})

    def pipe_run(self, input_files=None):
        """ For each file, print out its name and contents.
        """
        max_len = 0
github kyocum / disdat / tests / pipelines / test_api_run.py View on Github external
class DataMaker(PipeTask):
    """ Run this by itself.
    Then B requires DataMaker as external, and A. """

    int_array = luigi.ListParameter(default=[0,1])

    def pipe_requires(self, pipeline_input=None):
        self.set_bundle_name("DataMaker")
        return

    def pipe_run(self, pipeline_input=None):
        return np.array(self.int_array)


class Root(PipeTask):

    int_array = luigi.ListParameter(default=None)

    def pipe_requires(self, pipeline_input=None):
        self.add_dependency('datamaker', DataMaker, {'int_array': self.int_array})

    def pipe_run(self, pipeline_input=None, datamaker=None):
        print ("Root received a datamaker {}".format(datamaker))
        return datamaker.mean()


if __name__ == '__main__':
    test()
github kyocum / disdat / tests / functional / test_managed.py View on Github external
class NonManagedLocal(PipeTask):
    def pipe_requires(self):
        self.set_bundle_name('b1')

    def pipe_run(self):
        # Task output was created on some local path
        d = {'col1': [1, 2], 'col2': [3, 4]}
        df = pd.DataFrame(data=d)
        local_file = '/tmp/test.parquet'
        df.to_parquet(local_file)
        return {'file': [local_file]}


class NonManagedS3(PipeTask):
    def pipe_requires(self):
        self.set_bundle_name('b2')

    def pipe_run(self):
        # Task output was created on some S3 path
        d = {'col1': [1, 2], 'col2': [3, 4]}
        df = pd.DataFrame(data=d)
        s3_file = 's3://{}/test.parquet'.format(TEST_BUCKET_OTHER)
        df.to_parquet(s3_file)

        return {'file': [s3_file]}


class ManagedLocal(PipeTask):
    def pipe_requires(self):
        self.set_bundle_name('b3')
github kyocum / disdat / tests / functional / test_requires.py View on Github external
""" Purpose of this test is to show that if you return nothing, you 
still need to get the input in the downstream task.  See git issue 
 https://github.com/kyocum/disdat/issues/31
 """


class a(PipeTask):
    def pipe_requires(self):
        return

    def pipe_run(self):
        return


class b(PipeTask):
    def pipe_requires(self):
        self.add_dependency('something', a, {})

    def pipe_run(self, something=None):
        print("Return type {}, object: {}".format(type(something), something))
        assert something is None


def test_requires(run_test):
    api.apply(TEST_CONTEXT, b, params={})


if __name__ == '__main__':
    pytest.main([__file__])
github kyocum / disdat / tests / functional / test_external_dep.py View on Github external
test_param = luigi.Parameter(default=EXT_TASK_PARAM_VAL)
    throw_assert = luigi.BoolParameter(default=True)

    def pipe_requires(self):
        self.set_bundle_name('pipeline_a')
        b = self.add_external_dependency('ext_input', ExternalPipeline, {'test_param': self.test_param})
        if self.throw_assert:
            assert b is not None
            assert list(b.data) == BUNDLE_CONTENTS

    def pipe_run(self, ext_input=None):
        assert list(ext_input) == BUNDLE_CONTENTS
        return True


class PipelineB(PipeTask):
    ext_uuid = luigi.Parameter()

    def pipe_requires(self):
        self.set_bundle_name('pipeline_b')
        b = self.add_external_dependency('ext_input',
                                         ExternalPipeline,
                                         {},
                                         uuid=self.ext_uuid)
        assert b is not None
        assert list(b.data) == BUNDLE_CONTENTS

    def pipe_run(self, ext_input=None):
        assert list(ext_input) == BUNDLE_CONTENTS
        return True
github kyocum / disdat / tests / functional / test_pipeline.py View on Github external
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import luigi

from disdat.pipe import PipeTask
import disdat.api as api
from tests.functional.common import run_test, TEST_CONTEXT


class A(PipeTask):
    def pipe_requires(self, pipeline_input=None):
        self.set_bundle_name('a')

    def pipe_run(self, pipeline_input=None):
        return 2


class B(PipeTask):

    n = luigi.IntParameter()

    def pipe_requires(self, pipeline_input=None):
        self.set_bundle_name('b')

    def pipe_run(self, pipeline_input=None):
        return 2 * self.n
github kyocum / disdat / tests / functional / test_reuse_logic.py View on Github external
return self.a+self.b


class APrime(PipeTask):

    a = luigi.IntParameter(default=2)
    b = luigi.IntParameter(default=1)

    def pipe_requires(self):
        pass

    def pipe_run(self):
        return self.a+self.b


class B(PipeTask):

    a = luigi.IntParameter(default=3)
    b = luigi.IntParameter(default=4)

    def pipe_requires(self):
        self.add_dependency('a', A, params={})

    def pipe_run(self, a):
        return a + self.a + self.b


class C(PipeTask):

    a = luigi.IntParameter(default=5)
    b = luigi.IntParameter(default=6)
github kyocum / disdat / tests / functional / test_external_dep.py View on Github external
EXT_TASK_PARAM_VAL='this is a test value'


class ExternalPipeline(PipeTask):
    test_param = luigi.Parameter()

    """ External Pipeline """
    def pipe_requires(self):
        self.set_bundle_name('external_pipeline')

    def pipe_run(self):
        print ("ExternalPipeline called with parameter [{}]".format(self.test_param))
        return BUNDLE_CONTENTS


class PipelineA(PipeTask):
    test_param = luigi.Parameter(default=EXT_TASK_PARAM_VAL)
    throw_assert = luigi.BoolParameter(default=True)

    def pipe_requires(self):
        self.set_bundle_name('pipeline_a')
        b = self.add_external_dependency('ext_input', ExternalPipeline, {'test_param': self.test_param})
        if self.throw_assert:
            assert b is not None
            assert list(b.data) == BUNDLE_CONTENTS

    def pipe_run(self, ext_input=None):
        assert list(ext_input) == BUNDLE_CONTENTS
        return True


class PipelineB(PipeTask):
github kyocum / disdat / tests / functional / common_tasks.py View on Github external
""" These are simple tasks used for test_api_run """

COMMON_DEFAULT_ARGS=[10, 100, 1000]


class B(PipeTask):
    """ B required by A """
    int_array = luigi.ListParameter(default=None)

    def pipe_run(self):
        print ("B saving type [{}]".format(type(self.int_array)))
        return self.int_array


class A(PipeTask):
    """ A is the root task"""
    int_array = luigi.ListParameter(default=COMMON_DEFAULT_ARGS)

    def pipe_requires(self):
        self.add_dependency('b', B, {'int_array': self.int_array})

    def pipe_run(self, b=None):
        print ("Saving the sum of B {}".format(b))
        print ("A got type [{}]".format(type(b)))
        return sum(list(b))
github kyocum / disdat / tests / functional / test_external_bundle.py View on Github external
pm_uuid = b.uuid
    b.rm()

    api.apply(TEST_CONTEXT, Root_1)

    b = api.get(TEST_CONTEXT, 'PreMaker')
    assert(b is not None)
    assert(b.uuid != pm_uuid)

    b = api.get(TEST_CONTEXT, 'Root_1')
    assert(b is not None)

    api.delete_context(TEST_CONTEXT)


class DataMaker(PipeTask):
    """ Run this by itself.
    Then B requires DataMaker as external, and A. """

    int_array = luigi.ListParameter(default=[1, 2, 3, 5, 8])

    def pipe_requires(self):
        self.set_bundle_name("DataMaker")
        self.add_dependency('premaker', PreMaker, params={})
        return

    def pipe_run(self, premaker=None):

        return np.array(self.int_array)


class PreMaker(PipeTask):