Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
api.remote(TEST_CONTEXT, TEST_CONTEXT, REMOTE_URL, force=True)
with api.Bundle(TEST_CONTEXT, TEST_NAME, owner=getpass.getuser()) as b:
for i in range(3):
with b.add_file('output_{}'.format(i)).open('w') as of:
of.write("some text for the {} file".format(i))
b.commit().push()
b.rm()
b.pull(localize=False)
api.apply(TEST_CONTEXT, 'test_output', 'ConsumeExtDep', incremental_pull=True)
api.delete_context(TEST_CONTEXT, remote=True)
try:
# Run test pipeline
api.apply(TEST_CONTEXT, CPush, incremental_push=True)
except Exception as e:
pass
# Get objects from remote
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
keys = [o['Key'] for o in objects['Contents']]
keys = [key.split('/')[-1] for key in keys]
# Make sure files exist in S3
for output_file in ['a.txt', 'b.txt']:
assert output_file in keys, 'Pipeline should have pushed file'
api.delete_context(TEST_CONTEXT)
# Bind remote context
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
# Run test pipeline
api.apply(TEST_CONTEXT, CIP)
# Push bundles to remote
for bundle_name in ['a', 'b', 'c']:
assert api.get(TEST_CONTEXT, bundle_name) is not None, 'Bundle should exist'
api.commit(TEST_CONTEXT, bundle_name)
api.push(TEST_CONTEXT, bundle_name)
# Blow away context and recreate
api.delete_context(TEST_CONTEXT)
assert TEST_CONTEXT not in api.ls_contexts()
api.context(context_name=TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
assert api.search(TEST_CONTEXT) == [], 'Context should be empty'
# Pull bundles from remote
api.pull(TEST_CONTEXT)
# Make sure all bundle meta data comes down but data remains in S3
for bundle_name in ['a', 'b', 'c']:
bundle = api.get(TEST_CONTEXT, bundle_name)
assert bundle is not None, 'Bundle should exist'
data_path = bundle.data['file'][0]
def setup():
if TEST_CONTEXT in api.ls_contexts():
api.delete_context(context_name=TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
api.add(TEST_CONTEXT, 'test_add_bundle', bundle_df_path, treat_file_as_bundle=True)
# Assert that data in bundle is a dataframe
b = api.get(TEST_CONTEXT, 'test_add_bundle')
assert(isinstance(b.data, pd.DataFrame))
# Add bundle dataframe with tags
tag = {'test': 'tag'}
api.add(TEST_CONTEXT, 'test_add_bundle', bundle_df_path, treat_file_as_bundle=True, tags=tag)
# Assert that data in bundle is a dataframe
b = api.get(TEST_CONTEXT, 'test_add_bundle')
assert(isinstance(b.data, pd.DataFrame))
assert b.tags == tag, 'Tags do not match'
api.delete_context(TEST_CONTEXT)
b = api.get(TEST_CONTEXT, 'PreMaker')
assert(b is not None)
pm_uuid = b.uuid
b.rm()
api.apply(TEST_CONTEXT, Root_1)
b = api.get(TEST_CONTEXT, 'PreMaker')
assert(b is not None)
assert(b.uuid != pm_uuid)
b = api.get(TEST_CONTEXT, 'Root_1')
assert(b is not None)
api.delete_context(TEST_CONTEXT)
b = api.get(TEST_CONTEXT, OUTPUT_BUNDLE)
print ("--3D: Pipeline tried to store {} and we found {}".format(test_arg, b.cat()))
assert(np.array_equal(b.cat(), test_arg))
b.rm()
print ("--4: Running with no submit ...")
print ("--4B: Reusing docker container")
print ("--4C: Submit Job on AWS Batch")
retval = api.run(TEST_CONTEXT, TEST_CONTEXT, OUTPUT_BUNDLE, PIPELINE_CLS,
pipeline_args={'int_array': test_arg},
remote=TEST_REMOTE,
backend='AWSBatch',
no_submit=True)
print ("--4C: RETVAL {}".format(retval))
api.delete_context(TEST_CONTEXT)
# Retrieve the bundle
b = api.get(TEST_CONTEXT, 'test_single_file')
# Assert the bundles contain the same data
bundle_hash, file_hash = get_hash(b.data[0]), get_hash(test_csv_path)
assert bundle_hash == file_hash, 'Hashes do not match'
assert b.tags == tag, 'Tags do not match'
# Remove test .csv
os.remove(test_csv_path)
# Assert that data still remains in the bundle
assert api.get(TEST_CONTEXT, 'test_single_file') is not None, 'Bundle should exist'
api.delete_context(TEST_CONTEXT)
f.write('this should not create a bundle')
# Assert the txt file exists
assert os.path.exists(test_txt_path)
# Try to add file to the bundle
with pytest.raises(AssertionError) as ex:
api.add(TEST_CONTEXT, 'bad_path', test_txt_path, treat_file_as_bundle=True)
# Assert Exited with error code of 1
assert ex.type == AssertionError
# Make sure bundle does not exist
assert api.get(TEST_CONTEXT, 'test_file_as_bundle_txt_file') is None, 'Bundle should not exist'
api.delete_context(TEST_CONTEXT)