Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for i in range(3):
with b.add_file('output_{}'.format(i)).open('w') as of:
of.write("some text for the {} file".format(i))
b.commit().push()
b.rm()
b.pull(localize=False)
api.apply(TEST_CONTEXT, 'test_output', 'ConsumeExtDep', incremental_pull=True)
api.delete_context(TEST_CONTEXT, remote=True)
class ConsumeExtDep(PipeTask):
""" Consume files from an external dependency
"""
def pipe_requires(self):
""" We depend on a manually created bundle that
is parameterized by its name and its owner
"""
self.add_external_dependency("input_files",
api.BundleWrapperTask,
{'name': TEST_NAME,
'owner': getpass.getuser()})
def pipe_run(self, input_files=None):
""" For each file, print out its name and contents.
"""
max_len = 0
class DataMaker(PipeTask):
""" Run this by itself.
Then B requires DataMaker as external, and A. """
int_array = luigi.ListParameter(default=[0,1])
def pipe_requires(self, pipeline_input=None):
self.set_bundle_name("DataMaker")
return
def pipe_run(self, pipeline_input=None):
return np.array(self.int_array)
class Root(PipeTask):
int_array = luigi.ListParameter(default=None)
def pipe_requires(self, pipeline_input=None):
self.add_dependency('datamaker', DataMaker, {'int_array': self.int_array})
def pipe_run(self, pipeline_input=None, datamaker=None):
print ("Root received a datamaker {}".format(datamaker))
return datamaker.mean()
if __name__ == '__main__':
test()
class NonManagedLocal(PipeTask):
def pipe_requires(self):
self.set_bundle_name('b1')
def pipe_run(self):
# Task output was created on some local path
d = {'col1': [1, 2], 'col2': [3, 4]}
df = pd.DataFrame(data=d)
local_file = '/tmp/test.parquet'
df.to_parquet(local_file)
return {'file': [local_file]}
class NonManagedS3(PipeTask):
def pipe_requires(self):
self.set_bundle_name('b2')
def pipe_run(self):
# Task output was created on some S3 path
d = {'col1': [1, 2], 'col2': [3, 4]}
df = pd.DataFrame(data=d)
s3_file = 's3://{}/test.parquet'.format(TEST_BUCKET_OTHER)
df.to_parquet(s3_file)
return {'file': [s3_file]}
class ManagedLocal(PipeTask):
def pipe_requires(self):
self.set_bundle_name('b3')
""" Purpose of this test is to show that if you return nothing, you
still need to get the input in the downstream task. See git issue
https://github.com/kyocum/disdat/issues/31
"""
class a(PipeTask):
def pipe_requires(self):
return
def pipe_run(self):
return
class b(PipeTask):
def pipe_requires(self):
self.add_dependency('something', a, {})
def pipe_run(self, something=None):
print("Return type {}, object: {}".format(type(something), something))
assert something is None
def test_requires(run_test):
api.apply(TEST_CONTEXT, b, params={})
if __name__ == '__main__':
pytest.main([__file__])
test_param = luigi.Parameter(default=EXT_TASK_PARAM_VAL)
throw_assert = luigi.BoolParameter(default=True)
def pipe_requires(self):
self.set_bundle_name('pipeline_a')
b = self.add_external_dependency('ext_input', ExternalPipeline, {'test_param': self.test_param})
if self.throw_assert:
assert b is not None
assert list(b.data) == BUNDLE_CONTENTS
def pipe_run(self, ext_input=None):
assert list(ext_input) == BUNDLE_CONTENTS
return True
class PipelineB(PipeTask):
ext_uuid = luigi.Parameter()
def pipe_requires(self):
self.set_bundle_name('pipeline_b')
b = self.add_external_dependency('ext_input',
ExternalPipeline,
{},
uuid=self.ext_uuid)
assert b is not None
assert list(b.data) == BUNDLE_CONTENTS
def pipe_run(self, ext_input=None):
assert list(ext_input) == BUNDLE_CONTENTS
return True
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import luigi
from disdat.pipe import PipeTask
import disdat.api as api
from tests.functional.common import run_test, TEST_CONTEXT
class A(PipeTask):
def pipe_requires(self, pipeline_input=None):
self.set_bundle_name('a')
def pipe_run(self, pipeline_input=None):
return 2
class B(PipeTask):
n = luigi.IntParameter()
def pipe_requires(self, pipeline_input=None):
self.set_bundle_name('b')
def pipe_run(self, pipeline_input=None):
return 2 * self.n
return self.a+self.b
class APrime(PipeTask):
a = luigi.IntParameter(default=2)
b = luigi.IntParameter(default=1)
def pipe_requires(self):
pass
def pipe_run(self):
return self.a+self.b
class B(PipeTask):
a = luigi.IntParameter(default=3)
b = luigi.IntParameter(default=4)
def pipe_requires(self):
self.add_dependency('a', A, params={})
def pipe_run(self, a):
return a + self.a + self.b
class C(PipeTask):
a = luigi.IntParameter(default=5)
b = luigi.IntParameter(default=6)
EXT_TASK_PARAM_VAL='this is a test value'
class ExternalPipeline(PipeTask):
test_param = luigi.Parameter()
""" External Pipeline """
def pipe_requires(self):
self.set_bundle_name('external_pipeline')
def pipe_run(self):
print ("ExternalPipeline called with parameter [{}]".format(self.test_param))
return BUNDLE_CONTENTS
class PipelineA(PipeTask):
test_param = luigi.Parameter(default=EXT_TASK_PARAM_VAL)
throw_assert = luigi.BoolParameter(default=True)
def pipe_requires(self):
self.set_bundle_name('pipeline_a')
b = self.add_external_dependency('ext_input', ExternalPipeline, {'test_param': self.test_param})
if self.throw_assert:
assert b is not None
assert list(b.data) == BUNDLE_CONTENTS
def pipe_run(self, ext_input=None):
assert list(ext_input) == BUNDLE_CONTENTS
return True
class PipelineB(PipeTask):
""" These are simple tasks used for test_api_run """
COMMON_DEFAULT_ARGS=[10, 100, 1000]
class B(PipeTask):
""" B required by A """
int_array = luigi.ListParameter(default=None)
def pipe_run(self):
print ("B saving type [{}]".format(type(self.int_array)))
return self.int_array
class A(PipeTask):
""" A is the root task"""
int_array = luigi.ListParameter(default=COMMON_DEFAULT_ARGS)
def pipe_requires(self):
self.add_dependency('b', B, {'int_array': self.int_array})
def pipe_run(self, b=None):
print ("Saving the sum of B {}".format(b))
print ("A got type [{}]".format(type(b)))
return sum(list(b))
pm_uuid = b.uuid
b.rm()
api.apply(TEST_CONTEXT, Root_1)
b = api.get(TEST_CONTEXT, 'PreMaker')
assert(b is not None)
assert(b.uuid != pm_uuid)
b = api.get(TEST_CONTEXT, 'Root_1')
assert(b is not None)
api.delete_context(TEST_CONTEXT)
class DataMaker(PipeTask):
""" Run this by itself.
Then B requires DataMaker as external, and A. """
int_array = luigi.ListParameter(default=[1, 2, 3, 5, 8])
def pipe_requires(self):
self.set_bundle_name("DataMaker")
self.add_dependency('premaker', PreMaker, params={})
return
def pipe_run(self, premaker=None):
return np.array(self.int_array)
class PreMaker(PipeTask):