Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
target (`Luigi.Target`): A Luigi Target object
source_url (str): Source data URL, accepts file:// and s3://
Returns:
None
"""
url = urlparse(source_url)
if url.scheme.lower() == 'file':
_logger.info('Copying {} from file {}'.format(target.path, url.path))
if not os.path.exists(url.path):
raise RuntimeError('Unable to find source file {}'.format(url.path))
shutil.copyfile(url.path, target.path)
elif url.scheme.lower() == 's3':
_logger.info('Downloading to {} from {}'.format(target.path, url.geturl()))
s3.get_s3_file(url.geturl(), target.path)
else:
_logger.info('Assuming file: Copying {} from file {}'.format(target.path, url.path))
if not os.path.exists(url.path):
raise RuntimeError('Unable to find source file {}'.format(url.path))
shutil.copyfile(url.path, target.path)
api.context(context_name=TEST_CONTEXT)
# Setup moto s3 resources
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
# Make sure bucket is empty
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
# Bind remote context
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
# Run test pipeline
api.apply(TEST_CONTEXT, CIP)
# Push bundles to remote
for bundle_name in ['a', 'b', 'c']:
assert api.get(TEST_CONTEXT, bundle_name) is not None, 'Bundle should exist'
api.commit(TEST_CONTEXT, bundle_name)
api.push(TEST_CONTEXT, bundle_name)
# Blow away context and recreate
api.delete_context(TEST_CONTEXT)
assert TEST_CONTEXT not in api.ls_contexts()
api.context(context_name=TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
assert api.search(TEST_CONTEXT) == [], 'Context should be empty'
def test_pull(run_test):
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
bucket = s3_resource.Bucket(TEST_BUCKET)
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
api.apply(TEST_CONTEXT, RemoteTest)
bundle = api.get(TEST_CONTEXT, 'remote_test')
assert bundle.data == 'Hello'
bundle.commit()
bundle.push()
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' in objects, 'Bucket should not be empty'
assert len(objects['Contents']) > 0, 'Bucket should not be empty'
api.delete_context(context_name=TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
api.pull(TEST_CONTEXT)
pulled_bundles = api.search(TEST_CONTEXT)
def test_push(run_test):
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
bucket = s3_resource.Bucket(TEST_BUCKET)
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
api.apply(TEST_CONTEXT, RemoteTest)
bundle = api.get(TEST_CONTEXT, 'remote_test')
assert bundle.data == 'Hello'
bundle.commit()
bundle.push()
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' in objects, 'Bucket should not be empty'
assert len(objects['Contents']) > 0, 'Bucket should not be empty'
bucket.objects.all().delete()
bucket.delete()
def test():
""" Purpose of this test is to have one task that produces a bundle.
And another task that requires it.
1.) Create external dep -- also creates PreMaker_auf_datamaker
dsdt apply - - test_external_bundle.DataMaker --int_array '[1000,2000,3000]'
2.) Remove Premaker_auf_datamaker
dsdt rm PreMaker_auf_datamaker
3.) Try to run Root -- it should find DataMaker but not re-create it or PreMaker_auf_datamaker
"""
api.context(TEST_CONTEXT)
result = None
try:
result = api.apply(TEST_CONTEXT, Root2, output_bundle='test_api_exit', params={}, force=True, workers=2)
except Exception as e:
print ("Got exception {} result {} ".format(e, e.result))
assert(e.result['did_work'])
assert(not e.result['success'])
finally:
print("API apply returned {}".format(result))
def run_test():
# Remove test context before running test
setup()
yield
api.delete_context(context_name=TEST_CONTEXT)
for i in range(3):
with b.add_file('output_{}'.format(i)).open('w') as of:
of.write("some text for the {} file".format(i))
b.commit().push()
b.rm()
b.pull(localize=False)
api.apply(TEST_CONTEXT, 'test_output', 'ConsumeExtDep', incremental_pull=True)
api.delete_context(TEST_CONTEXT, remote=True)
class ConsumeExtDep(PipeTask):
""" Consume files from an external dependency
"""
def pipe_requires(self):
""" We depend on a manually created bundle that
is parameterized by its name and its owner
"""
self.add_external_dependency("input_files",
api.BundleWrapperTask,
{'name': TEST_NAME,
'owner': getpass.getuser()})
def pipe_run(self, input_files=None):
""" For each file, print out its name and contents.
"""
max_len = 0
class DataMaker(PipeTask):
""" Run this by itself.
Then B requires DataMaker as external, and A. """
int_array = luigi.ListParameter(default=[0,1])
def pipe_requires(self, pipeline_input=None):
self.set_bundle_name("DataMaker")
return
def pipe_run(self, pipeline_input=None):
return np.array(self.int_array)
class Root(PipeTask):
int_array = luigi.ListParameter(default=None)
def pipe_requires(self, pipeline_input=None):
self.add_dependency('datamaker', DataMaker, {'int_array': self.int_array})
def pipe_run(self, pipeline_input=None, datamaker=None):
print ("Root received a datamaker {}".format(datamaker))
return datamaker.mean()
if __name__ == '__main__':
test()
def test():
""" Test the api.run() function.
1.) Create the container via the api
2.) Create a test context
3.) Call run locally
4.) Call run on AWS Batch (need to add MonkeyPatch)
"""
test_arg = [1000,2000,8000]
api.context(TEST_CONTEXT)
api.remote(TEST_CONTEXT, TEST_CONTEXT, TEST_REMOTE, force=True)
print ("--0: Create docker container")
api.dockerize(SETUP_DIR, PIPELINE_CLS)
print ("--1: Running container locally and storing results locally...")
retval = api.run(TEST_CONTEXT, TEST_CONTEXT, OUTPUT_BUNDLE, PIPELINE_CLS,
pipeline_args={'int_array': test_arg},
remote=TEST_REMOTE,
no_pull=True,
no_push=True)
print ("--1: 100 chars of RETVAL {}".format(retval[:100]))
b = api.get(TEST_CONTEXT, OUTPUT_BUNDLE)
assert(b is not None)
print ("--1: Pipeline tried to store {} and we found {}".format(test_arg, b.cat()))
assert(np.array_equal(b.cat(), test_arg))
def test():
""" This tests if mark_force works for tasks.
We have two tasks. One depends on the other. The upstream is marked
mark_force and should always run.
"""
def run_and_get(name, do_ext=False):
api.apply(TEST_CONTEXT, 'A_2', params={'set_ext_dep': do_ext})
b = api.get(TEST_CONTEXT, 'B')
print ("Run {}: b.creation_date {} b.uuid {}".format(name, b.creation_date, b.uuid))
return b
api.delete_context(TEST_CONTEXT)
api.context(TEST_CONTEXT)
b = run_and_get("One")
first_uuid = b.uuid
b = run_and_get("Two")
assert(first_uuid != b.uuid)
second_uuid = b.uuid
b = run_and_get("Three", do_ext=True)
assert(second_uuid == b.uuid)
api.delete_context(TEST_CONTEXT)