Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_remote_output_dir(self):
"""
Disdat Pipe API Function
Retrieve the output directory for this task's bundle. You may place
files directly into this directory.
Returns:
output_dir (str): The bundle's output directory on S3
"""
pce = PathCache.get_path_cache(self)
assert(pce is not None)
if self.data_context.remote_ctxt_url and self.incremental_push:
output_dir = os.path.join(self.data_context.get_remote_object_dir(), pce.uuid)
else:
raise Exception('Managed S3 path creation needs a) remote context and b) incremental push to be set')
return output_dir
Note: We don't add to context's db yet. The job or pipe hasn't run yet. So it
hasn't made all of its outputs. If it fails, by definition it won't right out the
hframe to the context's directory. On rebuild / restart we will delete the directory.
However, the path_cache will hold on to this directory in memory.
Args:
pipe (`disdat.pipe.PipeTask`): The task generating this output
data_context (`disdat.data_context.DataContext`): Place output in this context
force_uuid (str): Override uuid chosen by Disdat Bundle API
Returns:
None
"""
import disdat.api as api # 3.7 allows us to put this import at the top, but not 3.6.8
pce = PathCache.get_path_cache(pipe)
if pce is None:
_logger.debug("new_output_bundle: Adding a new (unseen) task to the path cache.")
else:
_logger.debug("new_output_bundle: Found a task in our dag already in the path cache: reusing!")
return
b = api.Bundle(data_context).open(force_uuid=force_uuid)
PathCache.put_path_cache(pipe, b, b.uuid, b.local_dir, True)
def reuse_bundle(pipe, bundle, uuid, data_context):
"""
Re-use this bundle, everything stays the same, just put in the cache
Args:
pipe (`pipe.PipeTask`): The pipe task that should not be re-run.
bundle (`disdat.api.bundle`): The found bundle to re-use.
uuid (str): The bundle's uuid
data_context: The context containing this bundle with UUID hfr_uuid.
Returns:
None
"""
pce = PathCache.get_path_cache(pipe)
if pce is None:
_logger.debug("reuse_bundle: Adding a new (unseen) task to the path cache.")
else:
_logger.debug("reuse_bundle: Found a task in our dag already in the path cache: reusing!")
return
dir = data_context.implicit_hframe_path(uuid)
PathCache.put_path_cache(pipe, bundle, uuid, dir, False)
def run(self):
"""
Call users run function.
1.) prepare the arguments
2.) run and gather user result
3.) interpret and wrap in a HyperFrame
Returns:
None
"""
kwargs = self.prepare_pipe_kwargs(for_run=True)
pce = PathCache.get_path_cache(self)
assert(pce is not None)
""" NOTE: If a user changes a task param in run(), and that param parameterizes a dependency in requires(),
then running requires() post run() will give different tasks. To be safe we record the inputs before run()
"""
cached_bundle_inputs = self.bundle_inputs()
try:
start = time.time() # P3 datetime.now().timestamp()
user_rtn_val = self.pipe_run(**kwargs)
stop = time.time() # P3 datetime.now().timestamp()
except Exception as error:
""" If user's pipe fails for any reason, remove bundle dir and raise """
try:
_logger.error("User pipe_run encountered exception: {}".format(error))
pce.bundle.abandon()
Returns:
(dict): A dictionary with the arguments.
"""
kwargs = dict()
# Place upstream task outputs into the kwargs. Thus the user does not call
# self.inputs(). If they did, they would get a list of output targets for the bundle
if for_run:
# Reset the stored tags, in case this instance is run multiple times.
self._input_tags = {}
self._input_bundle_uuids = {}
upstream_tasks = [(t.user_arg_name, PathCache.get_path_cache(t)) for t in self.deps()]
for user_arg_name, pce in [u for u in upstream_tasks if u[1] is not None]:
b = api.get(self.data_context.get_local_name(), None, uuid=pce.uuid)
assert b.is_presentable
# Download data that is not local (the linked files are not present).
# This is the default behavior when running in a container.
if self.incremental_pull:
b.pull(localize=True)
if pce.instance.user_arg_name in kwargs:
_logger.warning('Task human name {} reused when naming task dependencies: Dependency hyperframe shadowed'.format(pce.instance.user_arg_name))
self._input_tags[user_arg_name] = b.tags
self._input_bundle_uuids[user_arg_name] = pce.uuid
kwargs[user_arg_name] = b.data
output targets for them.
Use the pipe_task (or driver task) to get the name of the bundle.
Use the name of the bundle to look up the output path in the pipe cache in the
PipeFS class object.
Create an hframe. The individual frame records have to be written out before hand.
Args:
pipe_task: The pipe task that will use these outputs
Returns:
[ luigi output for meta file, luigi output for lineage file ]
"""
pce = PathCache.get_path_cache(pipe_task)
if pce is None:
# This can happen when the pipe has been created with non-deterministic parameters
_logger.error("add_bundle_meta_files: could not find pce for task {}".format(pipe_task.processing_id()))
_logger.error("It is possible one of your tasks is parameterized in a non-deterministic fashion.")
raise Exception("add_bundle_meta_files: Unable to find pce for task {}".format(pipe_task.processing_id()))
hframe = {PipeBase.HFRAME: luigi.LocalTarget(os.path.join(pce.path, HyperFrameRecord.make_filename(pce.uuid)))}
return hframe
def get_output_dir(self):
"""
Disdat Pipe API Function
Retrieve the output directory for this task's bundle. You may place
files directly into this directory.
Returns:
output_dir (str): The bundle's output directory
"""
pce = PathCache.get_path_cache(self)
assert(pce is not None)
return pce.path