Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
api.context(context_name=TEST_CONTEXT)
# Setup moto s3 resources
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
# Make sure bucket is empty
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
# Bind remote context
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
# Run test pipeline
api.apply(TEST_CONTEXT, CIP)
# Push bundles to remote
for bundle_name in ['a', 'b', 'c']:
assert api.get(TEST_CONTEXT, bundle_name) is not None, 'Bundle should exist'
api.commit(TEST_CONTEXT, bundle_name)
api.push(TEST_CONTEXT, bundle_name)
# Blow away context and recreate
api.delete_context(TEST_CONTEXT)
assert TEST_CONTEXT not in api.ls_contexts()
api.context(context_name=TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
assert api.search(TEST_CONTEXT) == [], 'Context should be empty'
def test_single_task():
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.apply(TEST_CONTEXT, A)
data = api.get(TEST_CONTEXT, 'a').data
assert data == 2, 'Data did not match output'
assert type(data) == int, 'Data is not path'
assert len(api.search(TEST_CONTEXT)) == 1, 'Only one bundle should be present'
api.apply(TEST_CONTEXT, A)
assert len(api.search(TEST_CONTEXT)) == 1, 'Only one bundle should be present'
def test_name_external_dependency(run_test):
uuid = create_bundle_from_pipeline()
api.apply(TEST_CONTEXT, PipelineC, params={'ext_name': EXT_BUNDLE_NAME})
result = api.apply(TEST_CONTEXT, PipelineC, params={'ext_name': EXT_BUNDLE_NAME})
assert result['success'] is True
assert result['did_work'] is False
def run_and_get(name, do_ext=False):
api.apply(TEST_CONTEXT, 'A_2', params={'set_ext_dep': do_ext})
b = api.get(TEST_CONTEXT, 'B')
print ("Run {}: b.creation_date {} b.uuid {}".format(name, b.creation_date, b.uuid))
return b
1.) Create external dep -- also creates PreMaker_auf_datamaker
dsdt apply - - test_external_bundle.DataMaker --int_array '[1000,2000,3000]'
2.) Remove Premaker_auf_datamaker
dsdt rm PreMaker_auf_datamaker
3.) Try to run Root -- it should find DataMaker but not re-create it or PreMaker_auf_datamaker
"""
api.context(TEST_CONTEXT)
result = None
try:
result = api.apply(TEST_CONTEXT, Root2, output_bundle='test_api_exit', params={}, force=True, workers=2)
except Exception as e:
print ("Got exception {} result {} ".format(e, e.result))
assert(e.result['did_work'])
assert(not e.result['success'])
finally:
print("API apply returned {}".format(result))
def test_managed_local():
api.delete_context(TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.apply(TEST_CONTEXT, ManagedLocal)
assert len(api.search(TEST_CONTEXT)) == 1, 'Only one bundle should be present'
print(api.cat(TEST_CONTEXT, 'b3'))
assert os.path.exists(api.search(TEST_CONTEXT, human_name='b3')[0].data['file'][0]), \
'Local file should be present in bundle'
def test_dependant_tasks():
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.apply(TEST_CONTEXT, C)
data = api.get(TEST_CONTEXT, 'c').data
assert data == 6, 'Data did not match output'
assert type(data) == int, 'Data is not path'
assert len(api.search(TEST_CONTEXT)) == 3, 'Three bundles should be present'
def test_ABC7(run_test):
"""
7.) Run A->B->C, Run A*->B. Run A->B->C, nothing should run
Args:
run_test:
Returns:
"""
result = api.apply(TEST_CONTEXT, C)
assert result['success'] is True
assert result['did_work'] is True
B_uuid = api.get(TEST_CONTEXT, 'B').uuid
def custom_B_requires(self):
self.add_dependency('a', APrime, params={})
old_requires = B.pipe_requires
B.pipe_requires = custom_B_requires
result = api.apply(TEST_CONTEXT, B)
assert result['success'] is True
assert result['did_work'] is True
assert B_uuid != api.get(TEST_CONTEXT, 'B').uuid # should have a new B
B.pipe_requires = old_requires
class Average(PipeTask):
"""
Average scores of an upstream task
"""
def pipe_requires(self):
""" Depend on GenData """
self.add_dependency('my_input_data', GenData, {})
def pipe_run(self, my_input_data=None):
""" Compute average and return as a dictionary """
return {'average': [np.average(my_input_data)]}
if __name__ == "__main__":
api.apply('examples', 'Average', params={})
# If specified, decode the ordinary 'key:value' strings into a dictionary of tags.
input_tags = disdat.common.parse_args_tags(args.input_tag)
output_tags = disdat.common.parse_args_tags(args.output_tag)
# Convert string of pipeline args into dictionary for api.apply
deser_user_params = disdat.common.parse_params(args.pipe_cls, args.pipeline_args)
# If the user wants final and intermediate, then inc push.
if not args.no_push and not args.no_push_intermediates:
incremental_push = True
else:
incremental_push = False
try:
result = disdat.api.apply(args.branch,
args.pipe_cls,
output_bundle=args.output_bundle,
input_tags=input_tags,
output_tags=output_tags,
params=deser_user_params,
output_bundle_uuid=args.output_bundle_uuid,
force=args.force,
force_all=args.force_all,
workers=args.workers,
incremental_push=incremental_push,
incremental_pull=incremental_pull)
if not incremental_push:
if not args.no_push:
if not args.no_push_intermediates:
to_push = disdat.api.search(args.branch, is_committed=False, find_intermediates=True)