Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_input_args(self):
# TODO(brianm): this doesn't work when subclass yields from `requires`
job_input = self.input()
if isinstance(job_input, luigi.Target):
job_input = {"input": job_input}
if len(job_input) == 0: # default requires()
return []
if not isinstance(job_input, dict):
raise ValueError("Input (requires()) must be dict type")
input_args = []
for (name, targets) in job_input.items():
uris = [get_uri(target) for target in luigi.task.flatten(targets)]
if isinstance(targets, dict):
# If targets is a dict that means it had multiple outputs. In this case make the
# input args "<input>-"
names = ["%s-%s" % (name, key) for key in targets.keys()]
else:
names = [name] * len(uris)
for (arg_name, uri) in zip(names, uris):
input_args.append("--%s=%s" % (arg_name, uri))
return input_args
def source_uris(self):
"""The fully-qualified URIs that point to your data in Google Cloud Storage.
Each URI can contain one '*' wildcard character and it must come after the 'bucket' name."""
return [x.path for x in luigi.task.flatten(self.input())]
with warnings.catch_warnings():
warnings.filterwarnings(action='ignore', message='Task .* without outputs has no custom complete() method')
is_task_complete = task.complete()
is_complete = ('COMPLETE' if is_task_complete else 'PENDING')
result = '\n' + indent
if last:
result += '└─-'
indent += ' '
else:
result += '|--'
indent += '| '
name = task.__class__.__name__
result += f'({is_complete}) {name}[{task.make_unique_id()}]'
if details:
params = task.get_info(only_significant=True)
output_paths = [t.path() for t in luigi.task.flatten(task.output())]
processing_time = task.get_processing_time()
if type(processing_time) == float:
processing_time = str(processing_time) + 's'
result += f'(parameter={params}, output={output_paths}, time={processing_time}, task_log={dict(task.get_task_log())})'
children = luigi.task.flatten(task.requires())
for index, child in enumerate(children):
result += make_tree_info(child, indent, (index + 1) == len(children), details=details)
return result
def _make_hash_id(self):
def _to_str_params(task):
if isinstance(task, TaskOnKart):
return str(task.make_unique_id()) if task.significant else None
return task.to_str_params(only_significant=True)
dependencies = [_to_str_params(task) for task in luigi.task.flatten(self.requires())]
dependencies = [d for d in dependencies if d is not None]
dependencies.append(self.to_str_params(only_significant=True))
dependencies.append(self.__class__.__name__)
return hashlib.md5(str(dependencies).encode()).hexdigest()
def get_workflow_alarm_puts(task):
puts = get_task_alarm_puts(task)
req = flatten(task.requires())
for t in req:
puts += get_workflow_alarm_puts(t)
return puts
def traverse(t, path=None):
if path is None: path = []
path = path + [t]
for node in flatten(t.requires()):
if not node in path:
path = traverse(node, path)
return path
def deps(self):
# Overrides the default implementation
return luigi.task.flatten(self.requires_hadoop()) + luigi.task.flatten(self.requires_local())
def _dump_task_log(self):
self.task_log['file_path'] = [target.path() for target in luigi.task.flatten(self.output())]
self.dump(self.task_log, self._get_task_log_target())
def source_uris(self):
return [self._avro_uri(x) for x in flatten(self.input())]