Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# no bundle, force recompute
new_output_bundle(pipe, data_context)
return regen_bundle
bndl = bndls[0] # our best guess is the most recent bundle with the same processing_id()
# 2.) Bundle exists - lineage object tells us input bundles.
lng = bndl.get_lineage()
if lng is None:
if verbose: print("resolve_bundle: No lineage present, getting new output bundle.\n")
new_output_bundle(pipe, data_context)
return regen_bundle
# 3.) Lineage record exists -- if new code, re-run
pipeline_path = os.path.dirname(sys.modules[pipe.__module__].__file__)
current_version = fs.DisdatFS().get_pipe_version(pipeline_path)
if different_code_versions(current_version, lng):
if verbose: print("resolve_bundle: New code version, getting new output bundle.\n")
new_output_bundle(pipe, data_context)
return regen_bundle
# 3.5.) Have we changed the output human bundle name? If so, re-run task.
# Note: we need to go through all the bundle versions with that processing_id.
# because, at the moment, we make new bundles when we change name. When in some sense
# it's just a tag set that should include other names and the data should be the same.
current_human_name = pipe.human_id()
found = False
for bndl in bndls:
if current_human_name == bndl.get_human_name():
found = True
apply.reference_count -= 1
if not apply.reference_count:
fs.DisdatFS().clear_pipe_version()
PathCache.clear_path_cache()
# Re-execute logic -- make copy of task DAG
# Creates a cache of {pipe:path_cache_entry} in the pipesFS object.
# This "task_path_cache" is used throughout execution to find output bundles.
reexecute_dag = driver.DriverTask(output_bundle, pipe_params,
pipe_cls, input_tags, output_tags, force_all,
data_context, incremental_push, incremental_pull)
# Get version information for pipeline
users_root_task = reexecute_dag.deps()[0]
pipeline_path = os.path.dirname(sys.modules[users_root_task.__module__].__file__)
fs.DisdatFS().get_pipe_version(pipeline_path)
# If the user just wants to re-run this task, use mark_force
if force:
users_root_task.mark_force()
# Resolve bundles. Calls requires. User may throw exceptions.
# OK, but we have to clean up PCE.
try:
did_work = resolve_workflow_bundles(reexecute_dag, data_context)
except Exception as e:
cleanup_pce()
raise
# At this point the path cache should be full of existing or new UUIDs.
# we are going to replace the final pipe's UUID if the user has passed one in.
# this happens when we run the docker container.
no_submit (bool): Produce the AWS job config (for AWS Batch), but do not submit the job
job_role_arn (str): The AWS role under which the job should execute
aws_session_token_duration (int): the number of seconds our temporary credentials should last.
cli (bool): Whether we called run from the API (buffer output) or the CLI
Returns:
job_result (json): A json blob that contains information about the run job. Error with empty dict. If backend
is Sagemaker, return TrainingJobArn. If backend is AWSBatch, return Batch Job description. If local, return stdout.
"""
def assert_or_log(cli, msg):
if cli:
_logger.error(msg)
else:
assert False, msg
pfs = fs.DisdatFS()
pipeline_setup_file = os.path.join(pipeline_root, 'setup.py')
if not common.setup_exists(pipeline_setup_file):
return assert_or_log(cli, "Disdat run: Unable to find setup.py file [{}].".format(pipeline_setup_file))
# When run in a container, we create the uuid externally to look for a specific result
output_bundle_uuid = pfs.disdat_uuid()
# If the user did not specify a context, use the configuration of the current context
if context is None:
if not pfs.in_context():
return assert_or_log(cli, "Disdat run: Not running in a local context. Switch or specify.")
remote, context = common.get_run_command_parameters(pfs)
if remote is None and (not no_push or not no_pull): # if pulling or pushing, need a remote
return assert_or_log(cli, "Pushing or pulling bundles with 'run' requires a remote.")
def _lineage(**kwargs):
"""Invoke the api.lineage() call from the CLI to find the lineage.
Args:
kwargs: command line args or internal dict call, must contain uuid:str and depth:int.
Returns:
None
"""
fs = disdat.fs.DisdatFS()
if not fs.in_context():
_logger.warning('Not in a data context')
return
ctxt = fs._curr_context.get_local_name()
# (depth, uuid, lineage)
lin_tuples = api.lineage(ctxt, kwargs['uuid'], kwargs['depth'])
for (d,uuid,l) in lin_tuples:
if l is None:
print("No lineage found for UUID {}".format(uuid))
else:
print_lineage_protobuf(l, d)
print()
def _add(args):
"""Invoke the api.add() call from the CLI to create a bundle.
Args:
args: command line args.
Returns:
None
"""
fs = disdat.fs.DisdatFS()
if not fs.in_context():
_logger.warning('Not in a data context')
return
_ = api.add(fs._curr_context.get_local_name(),
args.bundle,
args.path_name,
tags=common.parse_args_tags(args.tag))
return
def cleanup_pce():
"""
After running, decrement our reference count (which tells how many simultaneous apply methods are
running nested in this process. Once the last one completes, blow away our path cache and git hash.
Needed if we're run twice (from scratch) in the same process.
"""
apply.reference_count -= 1
if not apply.reference_count:
fs.DisdatFS().clear_pipe_version()
PathCache.clear_path_cache()
def get_all_pipesline_output_bundles():
"""
Find all output bundles for the pipes attached to the driver task
The DisdatFS object has a cache of [(pipe instance, path, rerun)]
Note: This does not include the driver's output bundle.
:return: list of [(bundle_name, PipeCacheEntry) ... ]
"""
all_bundles = defaultdict(PipeCacheEntry)
pcache = DisdatFS.path_cache()
for p_name, p_entry in pcache.items(): # @UnusedVariable
all_bundles[p_entry.instance.name_output_bundle()] = p_entry
return all_bundles
_logger.debug("force_all: {}".format(force_all))
_logger.debug("input tags: {}".format(input_tags))
_logger.debug("output tags: {}".format(output_tags))
_logger.debug("sys.path {}".format(sys.path))
_logger.debug("central_scheduler {}".format(central_scheduler))
_logger.debug("workers {}".format(workers))
_logger.debug("incremental_push {}".format(incremental_push))
_logger.debug("incremental_pull {}".format(incremental_pull))
if incremental_push:
_logger.warn("incremental_push {}".format(incremental_push))
if incremental_pull:
_logger.warn("incremental_pull {}".format(incremental_pull))
pfs = fs.DisdatFS()
if data_context is None:
if not pfs.in_context():
_logger.warning('Not in a data context')
return None
data_context = pfs.curr_context
# Increment the reference count for this process
apply.reference_count += 1
def cleanup_pce():
"""
After running, decrement our reference count (which tells how many simultaneous apply methods are
running nested in this process. Once the last one completes, blow away our path cache and git hash.
Needed if we're run twice (from scratch) in the same process.
"""