Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pull(run_test):
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
bucket = s3_resource.Bucket(TEST_BUCKET)
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
api.apply(TEST_CONTEXT, RemoteTest)
bundle = api.get(TEST_CONTEXT, 'remote_test')
assert bundle.data == 'Hello'
bundle.commit()
bundle.push()
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' in objects, 'Bucket should not be empty'
assert len(objects['Contents']) > 0, 'Bucket should not be empty'
api.delete_context(context_name=TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
api.pull(TEST_CONTEXT)
pulled_bundles = api.search(TEST_CONTEXT)
def run_and_get(name, do_ext=False):
api.apply(TEST_CONTEXT, 'A_2', params={'set_ext_dep': do_ext})
b = api.get(TEST_CONTEXT, 'B')
print ("Run {}: b.creation_date {} b.uuid {}".format(name, b.creation_date, b.uuid))
return b
def test_dict_task():
setup()
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.apply(TEST_CONTEXT, DictTask)
data = api.get(TEST_CONTEXT, 'dict_task').data
assert data == {
'hello': ['world']
}, 'Data did not match output'
assert type(data) == dict, 'Data is not dict'
assert len(api.search(TEST_CONTEXT)) == 1, 'Only one bundle should be present'
def test_file_task():
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.apply(TEST_CONTEXT, FileTask)
output_path = api.get(TEST_CONTEXT, 'file_task').data
with open(output_path) as f:
output = f.read()
assert output == '5', 'Data did not match output'
assert type(output_path )== str, 'Data is not path'
assert len(api.search(TEST_CONTEXT)) == 1, 'Only one bundle should be present'
# Blow away context and recreate
api.delete_context(TEST_CONTEXT)
assert TEST_CONTEXT not in api.ls_contexts()
api.context(context_name=TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
assert api.search(TEST_CONTEXT) == [], 'Context should be empty'
# Pull bundles from remote
api.pull(TEST_CONTEXT)
# Make sure all bundle meta data comes down but data remains in S3
for bundle_name in ['a', 'b', 'c']:
bundle = api.get(TEST_CONTEXT, bundle_name)
assert bundle is not None, 'Bundle should exist'
data_path = bundle.data['file'][0]
assert data_path.startswith('s3://'), 'Data should be in S3'
# Rerun pipeline
api.apply(TEST_CONTEXT, BIP, params={'n': 100}, incremental_pull=True)
# Make sure all bundles exist. Bundles a and b should have local paths
for bundle_name in ['a', 'b', 'c']:
bundle = api.get(TEST_CONTEXT, bundle_name)
assert bundle is not None, 'Bundle should exist'
data_path = bundle.data['file'][0]
if bundle_name in ['a', 'b']:
assert not data_path.startswith('s3://'), 'Data should be local'
output_bundle_uuid=args.output_bundle_uuid,
force=args.force,
force_all=args.force_all,
workers=args.workers,
incremental_push=incremental_push,
incremental_pull=incremental_pull)
if not incremental_push:
if not args.no_push:
if not args.no_push_intermediates:
to_push = disdat.api.search(args.branch, is_committed=False, find_intermediates=True)
for b in to_push:
_commit_and_push(b)
if result['did_work']:
_logger.info("Pipeline ran. Committing and pushing output bundle UUID {}.".format(args.output_bundle_uuid))
b = disdat.api.get(None, uuid=args.output_bundle_uuid)
assert(b is not None)
_commit_and_push(b)
else:
_logger.info("Pipeline ran but did no useful work (output bundle exists).")
else:
_logger.info("Pipeline ran but user specified not to push any bundles to remote context.")
else:
_logger.info("Pipeline ran using incremental pushing.")
except RuntimeError as re:
_logger.error('Failed to run pipeline: RuntimeError {}'.format(re))
sys.exit(os.EX_IOERR)
except disdat.common.ApplyError as ae:
_logger.error('Failed to run pipeline: ApplyException {}'.format(ae))
sys.exit(os.EX_IOERR)
if pipe.force and not isinstance(pipe, ExternalDepTask):
# Forcing recomputation through a manual --force directive
# If it is external, do not recompute in any case
_logger.debug("resolve_bundle: --force forcing a new output bundle.")
if verbose: print("resolve_bundle: --force forcing a new output bundle.\n")
new_output_bundle(pipe, data_context)
return regen_bundle
if isinstance(pipe, ExternalDepTask):
# NOTE: Even if add_external_dependency() fails to find the bundle we still succeed here.
# Thus it can look like we reuse a bundle, when in fact we don't. We error either
# within the user's requires, add_external_dependency(), or when Luigi can't find the task (current approach)
assert worker._is_external(pipe)
if verbose: print("resolve_bundle: found ExternalDepTask re-using bundle with UUID[{}].\n".format(pipe.uuid))
b = api.get(data_context.get_local_name(), None, uuid=pipe.uuid) # TODO:cache b in ext dep object, no 2x lookup
if b is None:
_logger.warn(f"Unable to resolve bundle[{pipe.uuid}] in context[{data_context.get_local_name()}]")
reuse_bundle(pipe, b, pipe.uuid, data_context) # Ensure that the PCE results in a file that cannot be found
else:
reuse_bundle(pipe, b, b.uuid, data_context)
return use_bundle
bndls = api.search(data_context.get_local_name(),
processing_name=pipe.processing_id())
if bndls is None or len(bndls) <= 0:
if verbose: print("resolve_bundle: No bundle with proc_name {}, getting new output bundle.\n".format(pipe.processing_id()))
# no bundle, force recompute
new_output_bundle(pipe, data_context)
return regen_bundle