Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_push(run_test):
s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
bucket = s3_resource.Bucket(TEST_BUCKET)
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' not in objects, 'Bucket should be empty'
assert len(api.search(TEST_CONTEXT)) == 0, 'Context should be empty'
api.remote(TEST_CONTEXT, TEST_REMOTE, TEST_BUCKET_URL)
api.apply(TEST_CONTEXT, RemoteTest)
bundle = api.get(TEST_CONTEXT, 'remote_test')
assert bundle.data == 'Hello'
bundle.commit()
bundle.push()
objects = s3_client.list_objects(Bucket=TEST_BUCKET)
assert 'Contents' in objects, 'Bucket should not be empty'
assert len(objects['Contents']) > 0, 'Bucket should not be empty'
bucket.objects.all().delete()
bucket.delete()
def run_test():
# Remove test context before running test
setup()
yield
api.delete_context(context_name=TEST_CONTEXT)
def test_run_local_container(run_test, build_container_setup_only):
""" Run the local container.
Test if it runs, test if it re-runs all steps, test if it re-runs last step.
"""
retval = api.run(SETUP_DIR,
TEST_CONTEXT,
PIPELINE_CLS
)
b_b = api.get(TEST_CONTEXT, 'B')
assert b_b is not None
b_a = api.get(TEST_CONTEXT, 'A')
assert b_a is not None
assert b_a.data == sum(COMMON_DEFAULT_ARGS)
# Re-run with force all
retval = api.run(SETUP_DIR,
TEST_CONTEXT,
PIPELINE_CLS,
{'int_array': [1, 2, 3]},
force_all=True
)
b_b_f = api.get(TEST_CONTEXT, 'B')
assert b_b_f is not None
assert b_b.uuid != b_b_f.uuid
def test_ord_external_dependency(run_test):
uuid = create_bundle_from_pipeline()
api.apply(TEST_CONTEXT, PipelineA)
result = api.apply(TEST_CONTEXT, PipelineA)
assert result['success'] is True
assert result['did_work'] is False
def setup():
if TEST_CONTEXT in api.ls_contexts():
api.delete_context(context_name=TEST_CONTEXT)
api.context(context_name=TEST_CONTEXT)
def test_single_file(tmpdir):
# Create Context
api.context(TEST_CONTEXT)
# Create test .csv file
test_csv_path = os.path.join(str(tmpdir), 'test.csv')
df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
df.to_csv(test_csv_path)
# Assert csv_file_exits
assert os.path.exists(test_csv_path)
# Add the file to the bundle
api.add(TEST_CONTEXT, 'test_single_file', test_csv_path)
# Retrieve the bundle
b = api.get(TEST_CONTEXT, 'test_single_file')
# Assert the bundles contain the same data
bundle_hash, file_hash = get_hash(b.data[0]), get_hash(test_csv_path)
assert bundle_hash == file_hash, 'Hashes do not match'
# Test with tags
tag = {'test': 'tag'}
api.add(TEST_CONTEXT, 'test_single_file', test_csv_path, tags=tag)
# Retrieve the bundle
b = api.get(TEST_CONTEXT, 'test_single_file')
# Assert the bundles contain the same data
def test_requires(run_test):
api.apply(TEST_CONTEXT, b, params={})
def _context_and_remote(context_name, remote=None):
"""Create a new Disdat context and bind remote if not None.
Check environment for 'LOCAL_EXECUTION', which should exist and be True if we are running
a container in an existing .disdat environment (e.g., on someone's laptop).
If so, do not take any actions that would change the state of the users CLI. That is, do not
switch contexts.
Args:
context_name (str): A fully-qualified context name. remote-context/local-context or local-context
remote (str): S3 remote name.
"""
retval = disdat.api.context(context_name)
if retval == 1: # branch exists
_logger.warn("Entrypoint found existing local context {} ".format(context_name))
_logger.warn("Entrypoint not switching and ignoring directive to change to remote context {}".format(remote))
elif retval == 0: # just made a new branch
if remote is not None:
_logger.info("Entrypoint made a new context {}, attaching remote {}".format(context_name, remote))
_remote(context_name, remote)
else:
_logger.error("Entrypoint got non standard retval {} from api.context({}) command.".format(retval, context_name))
return False
if disdat.common.LOCAL_EXECUTION not in os.environ:
disdat.api.switch(context_name)
else:
_logger.info("Container running locally (not in a cloud provider, aka AWS). Not switching contexts")
shutil.copyfile(url.path, target.path)
def pipe_run(self):
"""Download data from a source blob URL.
Args:
pipeline_input (`pandas.DataFrame`): A single-row, single-column dataframe with a remote URL
"""
source_url = self._validate_and_get_input_url()
target = self.create_output_file(os.path.basename(source_url))
Download._download_blob(target, source_url)
return {self.OUTPUT_FILE_KEY: [target.path]}
if __name__ == "__main__":
api.apply('examples', 'Download', params={'input_url': './download.py'})