Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pull(run_test):
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
bucket = s3_resource.Bucket(TEST_BUCKET)
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
api.apply(TEST_CONTEXT, RemoteTest)
bundle = api.get(TEST_CONTEXT, 'remote_test')
assert bundle.data == 'Hello'
bundle.commit()
bundle.push()
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' in objects, 'Bucket should not be empty'
assert len(objects['Contents']) > 0, 'Bucket should not be empty'
api.delete_context(context_name=TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
def test_add_with_treat_as_bundle():
api.delete_context(TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
# Setup moto s3 resources
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
# Make sure bucket is empty
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
# Bind remote context
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
# Try to run the pipeline - should fail
try:
# Run test pipeline
api.apply(TEST_CONTEXT, CPush, incremental_push=True)
except Exception as e:
pass
# Get objects from remote
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
keys = [o['Key'] for o in objects['Contents']]
keys = [key.split('/')[-1] for key in keys]
# Make sure files exist in S3
for output_file in ['a.txt', 'b.txt']:
assert output_file in keys, 'Pipeline should have pushed file'
def test_push(run_test):
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
bucket = s3_resource.Bucket(TEST_BUCKET)
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
api.apply(TEST_CONTEXT, RemoteTest)
bundle = api.get(TEST_CONTEXT, 'remote_test')
assert bundle.data == 'Hello'
bundle.commit()
bundle.push()
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' in objects, 'Bucket should not be empty'
assert len(objects['Contents']) > 0, 'Bucket should not be empty'
bucket.objects.all().delete()
bucket.delete()
# Run test pipeline
api.apply(TEST_CONTEXT, CIP)
# Push bundles to remote
for bundle_name in ['a', 'b', 'c']:
assert api.get(TEST_CONTEXT, bundle_name) is not None, 'Bundle should exist'
api.commit(TEST_CONTEXT, bundle_name)
api.push(TEST_CONTEXT, bundle_name)
# Blow away context and recreate
api.delete_context(TEST_CONTEXT)
assert TEST_CONTEXT not in api.ls_contexts()
api.context(context_name=TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
assert api.search(TEST_CONTEXT) == [], 'Context should be empty'
# Pull bundles from remote
api.pull(TEST_CONTEXT)
# Make sure all bundle meta data comes down but data remains in S3
for bundle_name in ['a', 'b', 'c']:
bundle = api.get(TEST_CONTEXT, bundle_name)
assert bundle is not None, 'Bundle should exist'
data_path = bundle.data['file'][0]
assert data_path.startswith('s3://'), 'Data should be in S3'
# Rerun pipeline
api.apply(TEST_CONTEXT, BIP, params={'n': 100}, incremental_pull=True)
api.context(context_name=TEST_CONTEXT)
# Setup moto s3 resources
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
s3_resource.create_bucket(Bucket=TEST_BUCKET_OTHER)
# Make sure bucket is empty
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
objects = s3_client.list_objects(Bucket=TEST_BUCKET_OTHER)
assert 'Contents' not in objects, 'Bucket should be empty'
# Bind remote context
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
# Apply
api.apply(TEST_CONTEXT, NonManagedS3, incremental_push=True)
print(api.cat(TEST_CONTEXT, 'b2'))
# Local context should not contain file if a remote exists.
b = api.search(TEST_CONTEXT, human_name='b2')[0]
assert not os.path.exists(b.data['file'][0]), 'Non Managed S3 file w/ remote should be copied to remote'
b.pull(localize=True)
assert os.path.exists(b.data['file'][0]), 'Non Managed S3 file after pull should be copied to local'
# Get objects from remote
objects = s3_client.list_objects(Bucket=TEST_BUCKET_OTHER)
keys = [o['Key'] for o in objects['Contents']]
keys = [key.split('/')[-1] for key in keys]
def test_zero_copy_s3_file(run_test):
""" Test managed path in local file """
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
saved_md5 = md5_file(__file__)
with api.Bundle(TEST_CONTEXT, name=TEST_BUNDLE) as b:
s3_target = b.get_remote_file('test_s3_file.txt')
aws_s3.cp_local_to_s3_file(__file__, s3_target.path)
b.add_data(s3_target)
b.add_tags({'info': 'added an s3 file'})
saved_uuid = b.uuid
b = api.get(TEST_CONTEXT, None, uuid=saved_uuid)
b.pull(localize=True)
md5 = md5_file(b.data)
print(md5)
print(saved_md5)
assert md5 == saved_md5
""" Test the api.run() function.
1.) Create the container via the api
2.) Create a test context
3.) Call run locally
4.) Call run on AWS Batch (need to add MonkeyPatch)
"""
test_arg = [1000,2000,8000]
api.context(TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_CONTEXT, TEST_REMOTE, force=True)
print ("--0: Create docker container")
api.dockerize(SETUP_DIR, PIPELINE_CLS)
print ("--1: Running container locally and storing results locally...")
retval = api.run(TEST_CONTEXT, TEST_CONTEXT, OUTPUT_BUNDLE, PIPELINE_CLS,
pipeline_args={'int_array': test_arg},
remote=TEST_REMOTE,
no_pull=True,
no_push=True)
print ("--1: 100 chars of RETVAL {}".format(retval[:100]))
b = api.get(TEST_CONTEXT, OUTPUT_BUNDLE)
assert(b is not None)
print ("--1: Pipeline tried to store {} and we found {}".format(test_arg, b.cat()))
assert(np.array_equal(b.cat(), test_arg))
b.rm()
def test_remote_no_push_managed_s3():
api.delete_context(TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
# Setup moto s3 resources
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
# Make sure bucket is empty
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
# Bind remote context
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
with pytest.raises(Exception) as e:
api.apply(TEST_CONTEXT, ManagedS3)
contexts = context_arg.split('/')
if len(contexts) > 1:
remote_context = contexts[0]
local_context = contexts[1]
else:
local_context = contexts[0]
remote_context = local_context
if remote_url is None:
_logger.error("Got an invalid URL {}".format(remote_url))
return False
try:
disdat.api.remote(local_context, remote_context, remote_url)
except Exception:
return False
return True