Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert bundle.data == 'Hello'
bundle.commit()
bundle.push()
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' in objects, 'Bucket should not be empty'
assert len(objects['Contents']) > 0, 'Bucket should not be empty'
api.delete_context(context_name=TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
api.pull(TEST_CONTEXT)
pulled_bundles = api.search(TEST_CONTEXT)
assert len(pulled_bundles) > 0, 'Pulled bundles down'
assert pulled_bundles[0].data == 'Hello', 'Bundle contains correct data'
bucket.objects.all().delete()
bucket.delete()
# Setup moto s3 resources
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
s3_resource.create_bucket(Bucket=TEST_BUCKET_OTHER)
# Make sure bucket is empty
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
objects = s3_client.list_objects(Bucket=TEST_BUCKET_OTHER)
assert 'Contents' not in objects, 'Bucket should be empty'
# Apply
api.apply(TEST_CONTEXT, NonManagedS3)
print(api.cat(TEST_CONTEXT, 'b2'))
assert len(api.search(TEST_CONTEXT)) == 1, 'One bundle should be present'
assert os.path.exists(api.search(TEST_CONTEXT, human_name='b2')[0].data['file'][0]), \
'Non Managed S3 file should be copied to local'
result = api.apply(TEST_CONTEXT, A)
assert result['did_work'] is True
first_A_uuid = api.get(TEST_CONTEXT, 'A').uuid
result = api.apply(TEST_CONTEXT, A)
assert result['did_work'] is False
second_A_uuid = api.get(TEST_CONTEXT, 'A').uuid
assert first_A_uuid == second_A_uuid
assert len(api.search(TEST_CONTEXT, 'A')) is 1
# Mod args, should re-run
result = api.apply(TEST_CONTEXT, A, params={'a': 2,'b': 3})
assert result['did_work'] is True
next_A_uuid = api.get(TEST_CONTEXT, 'A').uuid
assert next_A_uuid != second_A_uuid
assert len(api.search(TEST_CONTEXT, 'A')) is 2
def test_non_managed_local():
api.delete_context(TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.apply(TEST_CONTEXT, NonManagedLocal)
assert len(api.search(TEST_CONTEXT)) == 1, 'Only one bundle should be present'
print(api.cat(TEST_CONTEXT, 'b1'))
assert os.path.exists(api.search(TEST_CONTEXT, human_name='b1')[0].data['file'][0]), \
'Local file should be present in bundle'
def test_task_with_parameter():
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.apply(TEST_CONTEXT, B, params={'n': 10})
data = api.get(TEST_CONTEXT, 'b').data
assert data == 20, 'Data did not match output'
assert type(data) == int, 'Data is not path'
assert len(api.search(TEST_CONTEXT)) == 1, 'One bundle should be present'
api.apply(TEST_CONTEXT, B, params={'n': 20})
data = api.get(TEST_CONTEXT, 'b').data
assert data == 40, 'Data did not match output'
assert type(data) == int, 'Data is not path'
assert len(api.search(TEST_CONTEXT)) == 2, 'Two bundles should be present'
def test_int_task():
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.apply(TEST_CONTEXT, IntTask)
data = api.get(TEST_CONTEXT, 'int_task').data
assert data == 1, 'Data did not match output'
assert type(data) == int, 'Data is not int'
assert len(api.search(TEST_CONTEXT)) == 1, 'Only one bundle should be present'
# Make sure bucket is empty
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
objects = s3_client.list_objects(Bucket=TEST_BUCKET_OTHER)
assert 'Contents' not in objects, 'Bucket should be empty'
# Bind remote context
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
# Apply
api.apply(TEST_CONTEXT, NonManagedS3)
print(api.cat(TEST_CONTEXT, 'b2'))
# Local context should not contain file if a remote exists.
b = api.search(TEST_CONTEXT, human_name='b2')[0]
assert not os.path.exists(b.data['file'][0]), 'Non Managed S3 file w/ remote should be copied to remote'
assert b.data['file'][0].startswith("s3://")
args.pipe_cls,
output_bundle=args.output_bundle,
input_tags=input_tags,
output_tags=output_tags,
params=deser_user_params,
output_bundle_uuid=args.output_bundle_uuid,
force=args.force,
force_all=args.force_all,
workers=args.workers,
incremental_push=incremental_push,
incremental_pull=incremental_pull)
if not incremental_push:
if not args.no_push:
if not args.no_push_intermediates:
to_push = disdat.api.search(args.branch, is_committed=False, find_intermediates=True)
for b in to_push:
_commit_and_push(b)
if result['did_work']:
_logger.info("Pipeline ran. Committing and pushing output bundle UUID {}.".format(args.output_bundle_uuid))
b = disdat.api.get(None, uuid=args.output_bundle_uuid)
assert(b is not None)
_commit_and_push(b)
else:
_logger.info("Pipeline ran but did no useful work (output bundle exists).")
else:
_logger.info("Pipeline ran but user specified not to push any bundles to remote context.")
else:
_logger.info("Pipeline ran using incremental pushing.")
except RuntimeError as re:
_logger.error('Failed to run pipeline: RuntimeError {}'.format(re))
task.processing_id()))
else:
if pce.rerun:
if verbose: print("Resolve_bundle: an upstream task is in the pce and is being re-run, so we need to reun. getting new output bundle.\n")
new_output_bundle(pipe, data_context)
return regen_bundle
# Upstream Task
# Resolve to a bundle
# Resolve to a bundle UUID and a processing name
# If it is an ordinary task in a workflow, we resolve via the processing name
if worker._is_external(task) and isinstance(task, ExternalDepTask):
upstream_dep_uuid = task.uuid
upstream_dep_processing_name = task.processing_name
else:
found = api.search(data_context.get_local_name(), processing_name=task.processing_id())
assert len(found) > 0
local_bundle = found[0] # the most recent with this processing_name
upstream_dep_uuid = local_bundle.pb.uuid
upstream_dep_processing_name = local_bundle.pb.processing_name
assert(upstream_dep_processing_name == task.processing_id())
""" Now we need to check if we should re-run this task because an upstream input exists and has been updated
Go through each of the inputs used for this current task.
POLICY
1.) if the date is more recent, it is "new" data.
2.) if it is older, we should require force (but currently do not and re-run).
XXX TODO: Add date to the depends_on pb data structure to enforce 2 XXX
"""
for tup in lng.pb.depends_on:
if tup.hframe_proc_name == upstream_dep_processing_name and tup.hframe_uuid != upstream_dep_uuid:
if verbose: print("Resolve_bundle: prior input bundle {} uuid {} has new uuid {}\n".format(