Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if ctxt_dir is None:
raise Exception("Unable to load context without a metadata directory argument")
contexts = {}
files = glob.glob(os.path.join(ctxt_dir, '*'))
for ctxt in files:
if len(target_contexts) > 0 and ctxt not in target_contexts:
continue
#_logger.debug("Loading context {}...".format(ctxt))
meta_file = os.path.join(ctxt_dir, ctxt, META_CTXT_FILE)
if not os.path.isfile(meta_file):
_logger.debug("No disdat {} meta ctxt data file found.".format(meta_file))
else:
with open(meta_file, 'r') as json_file:
dc_dict = json.loads(json_file.readline())
dc = DataContext(ctxt_dir, **dc_dict)
contexts[dc.local_ctxt] = dc
return contexts
def remote(local_context, remote_context, remote_url):
""" Add a remote to local_context.
Note that this local context may already have a remote bound. This means that it might have
references to bundles that have not been localized (file references will be 's3:`).
Args:
local_context (str): The name of the local context to which to add the remote.
remote_context (str): The name of the remote context.
remote_url (str): The S3 path that holds the contexts, e.g., s3://disdat-prod/beta/
Returns:
None
"""
_logger.debug("Adding remote context {} at URL {} on local context '{}'".format(remote_context,
remote_url,
local_context))
# Be generous and fix up S3 URLs to end on a directory.
remote_url = '{}/'.format(remote_url.rstrip('/'))
data_context = _get_context(local_context)
data_context.bind_remote_ctxt(remote_context, remote_url)
"""
dl_retry = 3
s3 = b3.resource('s3')
#print(f"get_s3_key the b3[{b3}] and client[{b3.client} and resource[{b3.resource}]")
if filename is None:
filename = os.path.basename(key)
else:
path = os.path.dirname(filename)
if not os.path.exists(path):
try:
os.makedirs(path)
except os.error as ose:
# swallow error -- likely directory already exists from other process
_logger.debug("aws_s3.get_s3_key: Error code {}".format(os.strerror(ose.errno)))
while dl_retry > 0:
try:
s3.Bucket(bucket).download_file(key, filename)
dl_retry = -1
except Exception as e:
_logger.warn("aws_s3.get_s3_key Retry Count [{}] on download_file raised exception {}".format(dl_retry, e))
dl_retry -= 1
if dl_retry <= 0:
_logger.warn("aws_s3.get_s3_key Fail on downloading file after 3 retries with exception {}".format(e))
raise
#print "PID({}) STOP bkt[] key[{}] file[{}]".format(multiprocessing.current_process(),key,filename)
return filename
Re-use this bundle, everything stays the same, just put in the cache
Args:
pipe (`pipe.PipeTask`): The pipe task that should not be re-run.
bundle (`disdat.api.bundle`): The found bundle to re-use.
uuid (str): The bundle's uuid
data_context: The context containing this bundle with UUID hfr_uuid.
Returns:
None
"""
pce = PathCache.get_path_cache(pipe)
if pce is None:
_logger.debug("reuse_bundle: Adding a new (unseen) task to the path cache.")
else:
_logger.debug("reuse_bundle: Found a task in our dag already in the path cache: reusing!")
return
dir = data_context.implicit_hframe_path(uuid)
PathCache.put_path_cache(pipe, bundle, uuid, dir, False)
# continue
# Src path can contain a sub-directory.
sub_dir = DataContext.find_subdir(src_path, dst_dir)
dst_file = os.path.join(dst_dir, sub_dir, os.path.basename(src_path))
if dst_scheme != 's3' and dst_scheme != 'db':
file_set.append(urllib.parse.urljoin('file:', dst_file))
else:
file_set.append(dst_file)
if src_path.startswith(os.path.dirname(file_set[-1])):
# This can happen if you re-push something already pushed that's not localized
# Or if the user places files directly in the output directory (or in a sub-directory of that directory)
file_set[-1] = src_path
_logger.debug("DataContext: copy_in_files found src {} == dst {}".format(src_path, file_set[-1]))
# but it can also happen if you re-bind and push. So check that file is present!
if urllib.parse.urlparse(src_path).scheme == 's3' and not aws_s3.s3_path_exists(src_path):
print(("DataContext: copy_in_files found s3 link {} not present!".format(src_path)))
print ("It is likely that this bundle existed on another remote branch and ")
print ("was not localized before changing remotes.")
raise Exception("copy_in_files: bad localized bundle push.")
continue
try:
if not os.path.isdir(src_path):
o = urllib.parse.urlparse(src_path)
if o.scheme == 's3':
# s3 to s3
if dst_scheme == 's3':
aws_s3.cp_s3_file(src_path, os.path.dirname(dst_file))
# a batch EC2 instance might not have access to the S3 remote. To
# get around this, allow the user to create some temporary AWS
# credentials.
if aws_session_token_duration > 0 and job_role_arn is None:
sts_client = b3.client('sts')
try:
token = sts_client.get_session_token(DurationSeconds=aws_session_token_duration)
credentials = token['Credentials']
container_overrides['environment'] = [
{'name': 'AWS_ACCESS_KEY_ID', 'value': credentials['AccessKeyId']},
{'name': 'AWS_SECRET_ACCESS_KEY', 'value': credentials['SecretAccessKey']},
{'name': 'AWS_SESSION_TOKEN', 'value': credentials['SessionToken']}
]
except Exception as e:
_logger.debug("Unable to generate an STS token, instead trying users default credentials...")
credentials = b3.session.Session().get_credentials()
container_overrides['environment'] = [
{'name': 'AWS_ACCESS_KEY_ID', 'value': credentials.access_key},
{'name': 'AWS_SECRET_ACCESS_KEY', 'value': credentials.secret_key},
{'name': 'AWS_SESSION_TOKEN', 'value': credentials.token}
]
job = client.submit_job(jobName=job_name, jobDefinition=job_definition_fqn, jobQueue=job_queue,
containerOverrides=container_overrides)
status = job['ResponseMetadata']['HTTPStatusCode']
if status == 200:
_logger.info('Job {} (ID {}) with definition {} submitted to AWS Batch queue {}'.format(job['jobName'], job['jobId'],
job_definition_fqn, job_queue))
return job
else:
tempdir = tempfile.mkdtemp()
with open(os.path.join(tempdir, 'hyperparameters.json'), 'w') as of:
json.dump(_sagemaker_hyperparameters_from_arglist(arglist), of)
args = ['train'] # rewrite to just 'train'
# On mac OS, tempdir returns /var, but is actually /private/var
# Add /private since it that dir is shared (and not /var) with Docker.
if on_macos:
localdir = os.path.join('/private', tempdir[1:])
else:
localdir = tempdir
volumes[localdir] = {'bind': '/opt/ml/input/config/', 'mode': 'rw'}
_logger.info("VOLUMES: {}".format(volumes))
else:
pipeline_image_name = common.make_project_image_name(pipeline_setup_file)
_logger.debug('Running image {} with arguments {}'.format(pipeline_image_name, arglist))
stdout = client.containers.run(pipeline_image_name, arglist, detach=False,
environment=environment, init=True, stderr=True, volumes=volumes)
stdout = six.ensure_str(stdout)
if cli: print(stdout)
return stdout
except docker.errors.ContainerError as ce:
_logger.error("Internal error running image {}".format(pipeline_image_name))
_logger.error("Error: {}".format(six.ensure_str(ce.stderr)))
return six.ensure_str(ce)
except docker.errors.ImageNotFound:
_logger.error("Unable to find the docker image {}".format(pipeline_image_name))
return None
force_all: force recomputation of dependencies
input_tags (dict): Tags used to find the input bundle
output_tags (dict): Tags that need to be placed on the output bundle
force_all (bool): whether to re-run this pipe
output_bundle_uuid (str): Optionally specify exactly the UUID of the output bundle IFF we actually need to produce it
central_scheduler: Use a centralized Luigi scheduler (default False, i.e., --local-scheduler is used)
workers: The number of luigi workers to use for this workflow (default 1)
data_context: Actual context object or None and read current context.
incremental_push (bool): Whether this job should push tasks as they complete to the remote (if configured)
incremental_pull (bool): Whether this job should localize bundles as needed from the remote (if configured)
Returns:
bool: True if tasks needed to be run, False if no tasks (beyond wrapper task) executed.
"""
_logger.debug("driver {}".format(driver.DriverTask))
_logger.debug("pipe_cls {}".format(pipe_cls))
_logger.debug("pipe params: {}".format(pipe_params))
_logger.debug("force: {}".format(force))
_logger.debug("force_all: {}".format(force_all))
_logger.debug("input tags: {}".format(input_tags))
_logger.debug("output tags: {}".format(output_tags))
_logger.debug("sys.path {}".format(sys.path))
_logger.debug("central_scheduler {}".format(central_scheduler))
_logger.debug("workers {}".format(workers))
_logger.debug("incremental_push {}".format(incremental_push))
_logger.debug("incremental_pull {}".format(incremental_pull))
if incremental_push:
_logger.warn("incremental_push {}".format(incremental_push))
if incremental_pull:
if verbose:
print("resolve_bundle: looking up bundle {}".format(pipe.processing_id()))
if pipe._mark_force and not isinstance(pipe, ExternalDepTask):
# Forcing recomputation through a manual annotation in the pipe.pipe_requires() itself
# If it is external, we don't recompute in any case.
_logger.debug("resolve_bundle: pipe.mark_force forcing a new output bundle.")
if verbose: print("resolve_bundle: pipe.mark_force forcing a new output bundle.\n")
new_output_bundle(pipe, data_context)
return regen_bundle
if pipe.force and not isinstance(pipe, ExternalDepTask):
# Forcing recomputation through a manual --force directive
# If it is external, do not recompute in any case
_logger.debug("resolve_bundle: --force forcing a new output bundle.")
if verbose: print("resolve_bundle: --force forcing a new output bundle.\n")
new_output_bundle(pipe, data_context)
return regen_bundle
if isinstance(pipe, ExternalDepTask):
# NOTE: Even if add_external_dependency() fails to find the bundle we still succeed here.
# Thus it can look like we reuse a bundle, when in fact we don't. We error either
# within the user's requires, add_external_dependency(), or when Luigi can't find the task (current approach)
assert worker._is_external(pipe)
if verbose: print("resolve_bundle: found ExternalDepTask re-using bundle with UUID[{}].\n".format(pipe.uuid))
b = api.get(data_context.get_local_name(), None, uuid=pipe.uuid) # TODO:cache b in ext dep object, no 2x lookup
if b is None:
_logger.warn(f"Unable to resolve bundle[{pipe.uuid}] in context[{data_context.get_local_name()}]")
reuse_bundle(pipe, b, pipe.uuid, data_context) # Ensure that the PCE results in a file that cannot be found
else:
reuse_bundle(pipe, b, b.uuid, data_context)
import disdat.api as api # 3.7 allows us to put this import at the top, but not 3.6.8
# These are constants
verbose = False
use_bundle = True
regen_bundle = False
# 1.) Get output bundle for pipe_id (the specific pipeline/transform/param hash).
if verbose:
print("resolve_bundle: looking up bundle {}".format(pipe.processing_id()))
if pipe._mark_force and not isinstance(pipe, ExternalDepTask):
# Forcing recomputation through a manual annotation in the pipe.pipe_requires() itself
# If it is external, we don't recompute in any case.
_logger.debug("resolve_bundle: pipe.mark_force forcing a new output bundle.")
if verbose: print("resolve_bundle: pipe.mark_force forcing a new output bundle.\n")
new_output_bundle(pipe, data_context)
return regen_bundle
if pipe.force and not isinstance(pipe, ExternalDepTask):
# Forcing recomputation through a manual --force directive
# If it is external, do not recompute in any case
_logger.debug("resolve_bundle: --force forcing a new output bundle.")
if verbose: print("resolve_bundle: --force forcing a new output bundle.\n")
new_output_bundle(pipe, data_context)
return regen_bundle
if isinstance(pipe, ExternalDepTask):
# NOTE: Even if add_external_dependency() fails to find the bundle we still succeed here.
# Thus it can look like we reuse a bundle, when in fact we don't. We error either
# within the user's requires, add_external_dependency(), or when Luigi can't find the task (current approach)