Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test():
""" Purpose of this test is to have one task that produces a bundle.
And another task that requires it.
1.) Create external dep -- also creates PreMaker_auf_datamaker
dsdt apply - - test_external_bundle.DataMaker --int_array '[1000,2000,3000]'
2.) Remove Premaker_auf_datamaker
dsdt rm PreMaker_auf_datamaker
3.) Try to run Root -- it should find DataMaker but not re-create it or PreMaker_auf_datamaker
"""
api.context(TEST_CONTEXT)
result = None
try:
result = api.apply(TEST_CONTEXT, Root2, output_bundle='test_api_exit', params={}, force=True, workers=2)
except Exception as e:
print ("Got exception {} result {} ".format(e, e.result))
assert(e.result['did_work'])
assert(not e.result['success'])
finally:
print("API apply returned {}".format(result))
def test():
""" Test the api.run() function.
1.) Create the container via the api
2.) Create a test context
3.) Call run locally
4.) Call run on AWS Batch (need to add MonkeyPatch)
"""
test_arg = [1000,2000,8000]
api.context(TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_CONTEXT, TEST_REMOTE, force=True)
print ("--0: Create docker container")
api.dockerize(SETUP_DIR, PIPELINE_CLS)
print ("--1: Running container locally and storing results locally...")
retval = api.run(TEST_CONTEXT, TEST_CONTEXT, OUTPUT_BUNDLE, PIPELINE_CLS,
pipeline_args={'int_array': test_arg},
remote=TEST_REMOTE,
no_pull=True,
no_push=True)
print ("--1: 100 chars of RETVAL {}".format(retval[:100]))
b = api.get(TEST_CONTEXT, OUTPUT_BUNDLE)
assert(b is not None)
print ("--1: Pipeline tried to store {} and we found {}".format(test_arg, b.cat()))
assert(np.array_equal(b.cat(), test_arg))
def test():
""" This tests if mark_force works for tasks.
We have two tasks. One depends on the other. The upstream is marked
mark_force and should always run.
"""
def run_and_get(name, do_ext=False):
api.apply(TEST_CONTEXT, 'A_2', params={'set_ext_dep': do_ext})
b = api.get(TEST_CONTEXT, 'B')
print ("Run {}: b.creation_date {} b.uuid {}".format(name, b.creation_date, b.uuid))
return b
api.delete_context(TEST_CONTEXT)
api.context(TEST_CONTEXT)
b = run_and_get("One")
first_uuid = b.uuid
b = run_and_get("Two")
assert(first_uuid != b.uuid)
second_uuid = b.uuid
b = run_and_get("Three", do_ext=True)
assert(second_uuid == b.uuid)
api.delete_context(TEST_CONTEXT)
def test(run_test):
""" Purpose of this test is to have one task that produces a bundle.
And another task that requires it.
1.) Run DataMaker which runs PreMaker
2.) Assert that those ran, and remove PreMaker
3.) run Root_1 which needs DataMaker (external dep) and PreMaker
4.) assert that premaker re-ran and root ran successfully (getting external dependency)
"""
api.context(TEST_CONTEXT)
api.apply(TEST_CONTEXT, DataMaker, params={'int_array': [1000, 2000, 3000]})
b = api.get(TEST_CONTEXT, 'PreMaker')
assert(b is not None)
pm_uuid = b.uuid
b.rm()
api.apply(TEST_CONTEXT, Root_1)
b = api.get(TEST_CONTEXT, 'PreMaker')
assert(b is not None)
assert(b.uuid != pm_uuid)
b = api.get(TEST_CONTEXT, 'Root_1')
assert(b is not None)
def setup():
if TEST_CONTEXT in api.ls_contexts():
api.delete_context(context_name=TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
def test_create_context():
context_name = '__test__'
assert context_name not in api.ls_contexts(), 'Context exists'
api.context(context_name)
assert context_name in api.ls_contexts(), 'Test context does exists'
api.delete_context(context_name=context_name)
assert context_name not in api.ls_contexts(), 'Test context exists'
def test_remote_no_push_managed_s3():
api.delete_context(TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
# Setup moto s3 resources
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
# Make sure bucket is empty
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
# Bind remote context
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
with pytest.raises(Exception) as e:
api.apply(TEST_CONTEXT, ManagedS3)