Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
target (`Luigi.Target`): A Luigi Target object
source_url (str): Source data URL, accepts file:// and s3://
Returns:
None
"""
url = urlparse(source_url)
if url.scheme.lower() == 'file':
_logger.info('Copying {} from file {}'.format(target.path, url.path))
if not os.path.exists(url.path):
raise RuntimeError('Unable to find source file {}'.format(url.path))
shutil.copyfile(url.path, target.path)
elif url.scheme.lower() == 's3':
_logger.info('Downloading to {} from {}'.format(target.path, url.geturl()))
s3.get_s3_file(url.geturl(), target.path)
else:
_logger.info('Assuming file: Copying {} from file {}'.format(target.path, url.path))
if not os.path.exists(url.path):
raise RuntimeError('Unable to find source file {}'.format(url.path))
shutil.copyfile(url.path, target.path)
def test_copy_in_s3_file(run_test):
""" Test copying in s3 file
The file should be copied into the local context
"""
s3_resource = boto3.resource('s3')
s3_resource.create_bucket(Bucket=TEST_BUCKET)
# Copy a local file to moto s3 bucket
saved_md5 = md5_file(__file__)
aws_s3.put_s3_file(__file__, TEST_BUCKET_URL)
s3_file = os.path.join(TEST_BUCKET_URL, os.path.basename(__file__))
with api.Bundle(TEST_CONTEXT, name=TEST_BUNDLE) as b:
b.add_data(s3_file)
b.add_tags({'info': 'added an s3 file'})
saved_uuid = b.uuid
b = api.get(TEST_CONTEXT, None, uuid=saved_uuid)
md5 = md5_file(b.data)
print(md5)
print(saved_md5)
assert md5 == saved_md5
job_definition_obj = aws.batch_get_latest_job_definition(job_definition_name)
if (job_definition_obj is not None and
job_definition_obj['containerProperties']['image'] == fq_repository_name and
job_definition_obj['containerProperties']['vcpus'] == vcpus and
job_definition_obj['containerProperties']['memory'] == memory and
check_role_arn(job_definition_obj, job_role_arn)):
job_definition_fqn = aws.batch_extract_job_definition_fqn(job_definition_obj)
_logger.info("Re-using prior AWS Batch run job definition : {}".format(job_definition_obj))
else:
""" Whether None or doesn't match, make a new one """
job_definition_obj = aws.batch_register_job_definition(job_definition_name, fq_repository_name,
vcpus=vcpus, memory=memory, job_role_arn=job_role_arn)
job_definition_fqn = aws.batch_get_job_definition(job_definition_name)
_logger.info("New AWS Batch run job definition {}".format(job_definition_fqn))
if no_submit:
# Return the job description object
return job_definition_obj
job_queue = disdat_config.parser.get(_MODULE_NAME, 'aws_batch_queue')
container_overrides = {'command': arglist}
# Through the magic of boto3_session_cache, the client in our script
# here can get at AWS profiles and temporary AWS tokens created in
job_definition_name = aws.batch_get_job_definition_name(pipeline_image_name)
if disdat_config.parser.has_option(_MODULE_NAME, 'aws_batch_job_definition'):
job_definition_name = disdat_config.parser.get(_MODULE_NAME, 'aws_batch_job_definition')
# TODO: Look through all of history to find one that matches?
# TODO: Delete old jobs here or let user do it?
job_definition_obj = aws.batch_get_latest_job_definition(job_definition_name)
if (job_definition_obj is not None and
job_definition_obj['containerProperties']['image'] == fq_repository_name and
job_definition_obj['containerProperties']['vcpus'] == vcpus and
job_definition_obj['containerProperties']['memory'] == memory and
check_role_arn(job_definition_obj, job_role_arn)):
job_definition_fqn = aws.batch_extract_job_definition_fqn(job_definition_obj)
_logger.info("Re-using prior AWS Batch run job definition : {}".format(job_definition_obj))
else:
""" Whether None or doesn't match, make a new one """
job_definition_obj = aws.batch_register_job_definition(job_definition_name, fq_repository_name,
vcpus=vcpus, memory=memory, job_role_arn=job_role_arn)
job_definition_fqn = aws.batch_get_job_definition(job_definition_name)
_logger.info("New AWS Batch run job definition {}".format(job_definition_fqn))
if no_submit:
# Return the job description object
return job_definition_obj
# args are more-or-less the same as the ones used to execute
# locally using 'dsdt run'
# Create a Job Definition and upload it.
# We create per-user job definitions so multiple users do not clobber each other.
# In addition, we never re-use a job definition, since the user may update
# the vcpu or memory requirements and those are stuck in the job definition
job_definition_name = aws.batch_get_job_definition_name(pipeline_image_name)
if disdat_config.parser.has_option(_MODULE_NAME, 'aws_batch_job_definition'):
job_definition_name = disdat_config.parser.get(_MODULE_NAME, 'aws_batch_job_definition')
# TODO: Look through all of history to find one that matches?
# TODO: Delete old jobs here or let user do it?
job_definition_obj = aws.batch_get_latest_job_definition(job_definition_name)
if (job_definition_obj is not None and
job_definition_obj['containerProperties']['image'] == fq_repository_name and
job_definition_obj['containerProperties']['vcpus'] == vcpus and
job_definition_obj['containerProperties']['memory'] == memory and
check_role_arn(job_definition_obj, job_role_arn)):
job_definition_fqn = aws.batch_extract_job_definition_fqn(job_definition_obj)
_logger.info("Re-using prior AWS Batch run job definition : {}".format(job_definition_obj))
else:
""" Whether None or doesn't match, make a new one """
job_definition_obj = aws.batch_register_job_definition(job_definition_name, fq_repository_name,
vcpus=vcpus, memory=memory, job_role_arn=job_role_arn)
"""
disdat_config = DisdatConfig.instance()
repository_prefix = None
if disdat_config.parser.has_option('docker', 'repository_prefix'):
repository_prefix = disdat_config.parser.get('docker', 'repository_prefix')
if is_sagemaker:
repository_name = common.make_sagemaker_project_repository_name(repository_prefix, pipeline_setup_file)
else:
repository_name = common.make_project_repository_name(repository_prefix, pipeline_setup_file)
# Figure out the fully-qualified repository name, i.e., the name
# including the registry.
registry_name = disdat_config.parser.get('docker', 'registry').strip('/')
if registry_name == '*ECR*':
fq_repository_name = aws.ecr_get_fq_repository_name(repository_name)
else:
fq_repository_name = '{}/{}'.format(registry_name, repository_name)
return fq_repository_name
print(("DataContext: copy_in_files found s3 link {} not present!".format(src_path)))
print ("It is likely that this bundle existed on another remote branch and ")
print ("was not localized before changing remotes.")
raise Exception("copy_in_files: bad localized bundle push.")
continue
try:
if not os.path.isdir(src_path):
o = urllib.parse.urlparse(src_path)
if o.scheme == 's3':
# s3 to s3
if dst_scheme == 's3':
aws_s3.cp_s3_file(src_path, os.path.dirname(dst_file))
elif dst_scheme != 'db': # assume 'file'
aws_s3.get_s3_file(src_path, dst_file)
else:
raise Exception("copy_in_files: copy s3 to unsupported scheme {}".format(dst_scheme))
elif o.scheme == 'db': # left for back compat for now
_logger.debug("Skipping a db file on bundle add")
elif o.scheme == 'file':
if dst_scheme == 's3':
# local to s3
aws_s3.put_s3_file(o.path, os.path.dirname(dst_file))
elif dst_scheme != 'db': # assume 'file'
# local to local
shutil.copy(o.path, os.path.dirname(dst_file))
else:
raise Exception("copy_in_files: copy local file to unsupported scheme {}".format(dst_scheme))
is_object_directory=data_context.bundle_count() > aws_s3.S3_LS_USE_MP_THRESH)
if not localize:
all_keys = [k for k in all_keys if 'frame.pb' in k]
fetch_count = 0
fetch_tuples = []
for s3_key in all_keys:
obj_basename = os.path.basename(s3_key)
obj_suffix = s3_key.replace(remote_obj_dir,'')
obj_suffix_dir = os.path.dirname(obj_suffix).strip('/') # remote_obj_dir won't have a trailing slash
local_uuid_dir = os.path.join(data_context.get_object_dir(), obj_suffix_dir)
local_object_path = os.path.join(local_uuid_dir, obj_basename)
if not os.path.exists(local_object_path):
fetch_count += 1
fetch_tuples.append((s3_bucket, s3_key, local_object_path))
_logger.info("Fast pull fetching {} objects...".format(fetch_count))
results = aws_s3.get_s3_key_many(fetch_tuples)
_logger.info("Fast pull completed {} transfers -- process pool closed and joined.".format(len(results)))
_logger.info("New AWS Batch run job definition {}".format(job_definition_fqn))
if no_submit:
# Return the job description object
return job_definition_obj
job_queue = disdat_config.parser.get(_MODULE_NAME, 'aws_batch_queue')
container_overrides = {'command': arglist}
# Through the magic of boto3_session_cache, the client in our script
# here can get at AWS profiles and temporary AWS tokens created in
# part from MFA tokens generated through the user's shells; we don't
# have to write special code of our own to deal with authenticating
# with AWS.
client = b3.client('batch', region_name=aws.profile_get_region())
# A bigger problem might be that the IAM role executing the job on
# a batch EC2 instance might not have access to the S3 remote. To
# get around this, allow the user to create some temporary AWS
# credentials.
if aws_session_token_duration > 0 and job_role_arn is None:
sts_client = b3.client('sts')
try:
token = sts_client.get_session_token(DurationSeconds=aws_session_token_duration)
credentials = token['Credentials']
container_overrides['environment'] = [
{'name': 'AWS_ACCESS_KEY_ID', 'value': credentials['AccessKeyId']},
{'name': 'AWS_SECRET_ACCESS_KEY', 'value': credentials['SecretAccessKey']},
{'name': 'AWS_SESSION_TOKEN', 'value': credentials['SessionToken']}
]
except Exception as e:
First edition, over-write everything.
Next edition, by smarter. Basically implement "sync"
Args:
data_context:
localize (bool): If True pull all files in each bundle, else just pull *frame.pb
Returns:
"""
_logger.info("Fast Pull synchronizing with remote context {}@{}".format(data_context.remote_ctxt,
data_context.remote_ctxt_url))
remote_s3_object_dir = data_context.get_remote_object_dir()
s3_bucket, remote_obj_dir = aws_s3.split_s3_url(remote_s3_object_dir)
all_keys = aws_s3.ls_s3_url_keys(remote_s3_object_dir,
is_object_directory=data_context.bundle_count() > aws_s3.S3_LS_USE_MP_THRESH)
if not localize:
all_keys = [k for k in all_keys if 'frame.pb' in k]
fetch_count = 0
fetch_tuples = []
for s3_key in all_keys:
obj_basename = os.path.basename(s3_key)
obj_suffix = s3_key.replace(remote_obj_dir,'')
obj_suffix_dir = os.path.dirname(obj_suffix).strip('/') # remote_obj_dir won't have a trailing slash
local_uuid_dir = os.path.join(data_context.get_object_dir(), obj_suffix_dir)
local_object_path = os.path.join(local_uuid_dir, obj_basename)
if not os.path.exists(local_object_path):
fetch_count += 1
fetch_tuples.append((s3_bucket, s3_key, local_object_path))
_logger.info("Fast pull fetching {} objects...".format(fetch_count))