Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def error(msg, *args, **kwargs):
_logger.error(msg, *args, **kwargs)
sys.exit(1)
Args:
fq_context_name:
Returns:
(str,str)
"""
try:
if '/' in fq_context_name:
repo, local_context = fq_context_name.split('/')
else:
repo = None
local_context = fq_context_name
except ValueError:
error = "Invalid context_name: Expected / or but got '%s'" % (fq_context_name)
_logger.error(error)
raise Exception(error)
return repo, local_context
def _run_git_cmd(git_dir, git_cmd, get_output=False):
"""Run a git command in a local git repository.
:param git_dir: A path within a local git repository, i.e. it may be a
subdirectory within the repository.
:param git_cmd: The git command string to run, i.e., everything that
would follow after :code:`git` on the command line.
:param get_output: If :code:`True`, return the command standard output
as a string; default is to return the command exit code.
"""
verbose = False
cmd = ['git', '-C', git_dir] + git_cmd.split()
if verbose:
_logger.debug('Running git command {}'.format(cmd))
if get_output:
try:
with open(os.devnull, 'w') as null_file:
output = subprocess.check_output(cmd, stderr=null_file)
except subprocess.CalledProcessError as e:
_logger.debug("Unable to run git command {}: exit {}: likely no git repo, e.g., running in a container.".format(cmd, e.returncode))
return e.returncode
else:
with open(os.devnull, 'w') as null_file:
output = subprocess.call(cmd, stdout=null_file, stderr=null_file)
# If P3, this may be a byte array. If P2, if not unicode, convert ...
output = six.ensure_str(output)
return output
Parameters:
params_str (dict): dict of str->str. param name -> value .
"""
kwargs = {}
cls_params = {n: p for n, p in cls.get_params()} # get_params() returns [ (name, param), ... ]
for param_name, param_str in params_str.items():
if param_name in cls_params:
param = cls_params[param_name]
if isinstance(param_str, list):
kwargs[param_name] = param._parse_list(param_str)
else:
kwargs[param_name] = param.parse(param_str)
else:
_logger.error("Parameter {} is not defined in class {}.".format(param_name, cls.__name__))
raise ValueError("Parameter {} is not defined in class {}.".format(param_name, cls.__name__))
return kwargs
""" Attach data to a bundle. The bundle must be open and not closed.
One attaches one data item to a bundle (dictionary, list, tuple, scalar, or dataframe).
Calling this replaces the latest item -- only the latest will be included in the bundle on close.
Note: One uses `add_data_row` or `add_data` but not both. Adding a row after `add_data`
removes the data. Using `add_data` after `add_data_row` removes all previously added rows.
Args:
data (list|tuple|dict|scalar|`pandas.DataFrame`):
Returns:
self
"""
self._check_open()
if self._data is not None:
_logger.warning("Disdat API add_data replacing existing data on bundle")
self._data = data
return self
self._input_tags = {}
self._input_bundle_uuids = {}
upstream_tasks = [(t.user_arg_name, PathCache.get_path_cache(t)) for t in self.deps()]
for user_arg_name, pce in [u for u in upstream_tasks if u[1] is not None]:
b = api.get(self.data_context.get_local_name(), None, uuid=pce.uuid)
assert b.is_presentable
# Download data that is not local (the linked files are not present).
# This is the default behavior when running in a container.
if self.incremental_pull:
b.pull(localize=True)
if pce.instance.user_arg_name in kwargs:
_logger.warning('Task human name {} reused when naming task dependencies: Dependency hyperframe shadowed'.format(pce.instance.user_arg_name))
self._input_tags[user_arg_name] = b.tags
self._input_bundle_uuids[user_arg_name] = pce.uuid
kwargs[user_arg_name] = b.data
return kwargs
# Forcing recomputation through a manual --force directive
# If it is external, do not recompute in any case
_logger.debug("resolve_bundle: --force forcing a new output bundle.")
if verbose: print("resolve_bundle: --force forcing a new output bundle.\n")
new_output_bundle(pipe, data_context)
return regen_bundle
if isinstance(pipe, ExternalDepTask):
# NOTE: Even if add_external_dependency() fails to find the bundle we still succeed here.
# Thus it can look like we reuse a bundle, when in fact we don't. We error either
# within the user's requires, add_external_dependency(), or when Luigi can't find the task (current approach)
assert worker._is_external(pipe)
if verbose: print("resolve_bundle: found ExternalDepTask re-using bundle with UUID[{}].\n".format(pipe.uuid))
b = api.get(data_context.get_local_name(), None, uuid=pipe.uuid) # TODO:cache b in ext dep object, no 2x lookup
if b is None:
_logger.warn(f"Unable to resolve bundle[{pipe.uuid}] in context[{data_context.get_local_name()}]")
reuse_bundle(pipe, b, pipe.uuid, data_context) # Ensure that the PCE results in a file that cannot be found
else:
reuse_bundle(pipe, b, b.uuid, data_context)
return use_bundle
bndls = api.search(data_context.get_local_name(),
processing_name=pipe.processing_id())
if bndls is None or len(bndls) <= 0:
if verbose: print("resolve_bundle: No bundle with proc_name {}, getting new output bundle.\n".format(pipe.processing_id()))
# no bundle, force recompute
new_output_bundle(pipe, data_context)
return regen_bundle
bndl = bndls[0] # our best guess is the most recent bundle with the same processing_id()
try:
if not os.path.isdir(src_path):
o = urllib.parse.urlparse(src_path)
if o.scheme == 's3':
# s3 to s3
if dst_scheme == 's3':
aws_s3.cp_s3_file(src_path, os.path.dirname(dst_file))
elif dst_scheme != 'db': # assume 'file'
aws_s3.get_s3_file(src_path, dst_file)
else:
raise Exception("copy_in_files: copy s3 to unsupported scheme {}".format(dst_scheme))
elif o.scheme == 'db': # left for back compat for now
_logger.debug("Skipping a db file on bundle add")
elif o.scheme == 'file':
if dst_scheme == 's3':
# local to s3
aws_s3.put_s3_file(o.path, os.path.dirname(dst_file))
elif dst_scheme != 'db': # assume 'file'
# local to local
shutil.copy(o.path, os.path.dirname(dst_file))
else:
raise Exception("copy_in_files: copy local file to unsupported scheme {}".format(dst_scheme))
else:
raise Exception("DataContext copy-in-file found bad scheme: {} from {}".format(o.scheme, o))
else:
_logger.info("DataContext copy-in-file: Not adding files in directory {}".format(src_path))
except (IOError, os.error) as why: