Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
executor_label = self.tasks[task_id]["executor"]
try:
executor = self.executors[executor_label]
except Exception:
logger.exception("Task {} requested invalid executor {}: config is\n{}".format(task_id, executor_label, self._config))
if self.monitoring is not None and self.monitoring.resource_monitoring_enabled:
executable = self.monitoring.monitor_wrapper(executable, task_id,
self.monitoring.monitoring_hub_url,
self.run_id,
self.monitoring.resource_monitoring_interval)
with self.submitter_lock:
exec_fu = executor.submit(executable, *args, **kwargs)
self.tasks[task_id]['status'] = States.launched
if self.monitoring is not None:
task_log_info = self._create_task_log_info(task_id, 'lazy')
self.monitoring.send(MessageType.TASK_INFO, task_log_info)
exec_fu.retries_left = self._config.retries - \
self.tasks[task_id]['fail_count']
logger.info("Task {} launched on executor {}".format(task_id, executor.label))
try:
exec_fu.add_done_callback(partial(self.handle_exec_update, task_id))
except Exception as e:
logger.error("add_done_callback got an exception {} which will be ignored".format(e))
return exec_fu
pending = 0
runnable = 1
running = 2
done = 3
failed = 4
dep_fail = 5
retry = 6
launched = 7
# states from which we will never move to another state
FINAL_STATES = [States.done, States.failed, States.dep_fail]
# states which are final and which indicate a failure. This must
# be a subset of FINAL_STATES
FINAL_FAILURE_STATES = [States.failed, States.dep_fail]
class States(IntEnum):
"""Map states for tasks to an int."""
unsched = -1
pending = 0
runnable = 1
running = 2
done = 3
failed = 4
dep_fail = 5
retry = 6
launched = 7
# states from which we will never move to another state
FINAL_STATES = [States.done, States.failed, States.dep_fail]
# states which are final and which indicate a failure. This must
# be a subset of FINAL_STATES
FINAL_FAILURE_STATES = [States.failed, States.dep_fail]
self.dep_funcs[task_id] = [self.fu_to_func[fu] for fu in depends]
self.dep_names[task_id] = [dep.__name__ for dep in self.dep_funcs[task_id]]
#print("dep funcs = {}".format(self.dep_funcs[task_id]))
#print("dep names = {}".format(self.dep_names[task_id]))
#dep_cnt = self._count_deps(dep ends, task_id)
task_def = { 'depends' : depends,
'func' : func,
'args' : args,
'kwargs' : kwargs,
'callback' : None,
'dep_cnt' : dep_cnt,
'exec_fu' : None,
'status' : States.unsched,
'app_fu' : None }
if task_id in self.tasks:
raise DuplicateTaskError("Task {0} in pending list".format(task_id))
else:
self.tasks[task_id] = task_def
# Extract stdout and stderr to pass to AppFuture:
task_stdout = kwargs.get('stdout', None)
task_stderr = kwargs.get('stderr', None)
if dep_cnt == 0 :
# Set to running
new_args, kwargs, exceptions = self.sanitize_and_wrap(task_id, args, kwargs)
if not exceptions:
self.tasks[task_id]['exec_fu'] = self.launch_task(task_id, func, *new_args, **kwargs)
or callback.
"""
if self._count_deps(self.tasks[task_id]['depends']) == 0:
# We can now launch *task*
new_args, kwargs, exceptions = self.sanitize_and_wrap(task_id,
self.tasks[task_id]['args'],
self.tasks[task_id]['kwargs'])
self.tasks[task_id]['args'] = new_args
self.tasks[task_id]['kwargs'] = kwargs
if not exceptions:
# There are no dependency errors
exec_fu = None
# Acquire a lock, retest the state, launch
with self.tasks[task_id]['task_launch_lock']:
if self.tasks[task_id]['status'] == States.pending:
exec_fu = self.launch_task(
task_id, self.tasks[task_id]['func'], *new_args, **kwargs)
else:
logger.info(
"Task {} failed due to dependency failure".format(task_id))
# Raise a dependency exception
self.tasks[task_id]['status'] = States.dep_fail
if self.monitoring is not None:
task_log_info = self._create_task_log_info(task_id, 'lazy')
self.monitoring.send(MessageType.TASK_INFO, task_log_info)
exec_fu = Future()
exec_fu.retries_left = 0
exec_fu.set_exception(DependencyError(exceptions,
task_id,
task_id, self.tasks[task_id]['func'], *new_args, **kwargs)
if exec_fu:
self.tasks[task_id]['exec_fu'] = exec_fu
try:
self.tasks[task_id]['app_fu'].update_parent(exec_fu)
self.tasks[task_id]['exec_fu'] = exec_fu
except AttributeError as e:
logger.error(
"Task {}: Caught AttributeError at update_parent".format(task_id))
raise e
else:
logger.info(
"Task {} failed due to dependency failure".format(task_id))
# Raise a dependency exception
self.tasks[task_id]['status'] = States.dep_fail
if self.monitoring is not None:
task_log_info = self._create_task_log_info(task_id, 'lazy')
self.monitoring.send(MessageType.TASK_INFO, task_log_info)
try:
fu = Future()
fu.retries_left = 0
self.tasks[task_id]['exec_fu'] = fu
self.tasks[task_id]['app_fu'].update_parent(fu)
fu.set_exception(DependencyError(exceptions,
task_id,
None))
except AttributeError as e:
logger.error(
"Task {} AttributeError at update_parent".format(task_id))
def __init__(self, client: Any, logger: Optional[logging.Logger] = None, **kwargs):
BaseAdapter.__init__(self, client, logger, **kwargs)
import parsl
self.client = parsl.dataflow.dflow.DataFlowKernel(self.client)
self._parsl_states = parsl.dataflow.states.States
self.app_map = {}
else:
self.tasks[task_id]['exec_fu'] = None
app_fu = AppFuture(self.tasks[task_id]['exec_fu'],
tid=task_id,
stdout=task_stdout,
stderr=task_stderr)
app_fu.set_exception(DependencyError(exceptions, "Failures in input dependencies", None))
self.tasks[task_id]['app_fu'] = app_fu
self.tasks[task_id]['status'] = States.dep_fail
else:
# Send to pending, create the AppFuture with no parent and have it set
# when an executor future is available.
self.tasks[task_id]['app_fu'] = AppFuture(None, tid=task_id,
stdout=task_stdout,
stderr=task_stderr)
self.tasks[task_id]['status'] = States.pending
#logger.debug("Task:%s Launched with AppFut:%s", task_id, task_def['app_fu'])
fu = task_def['app_fu'] # This was the return value
## End of Parsl code
self.fu_to_func[fu] = func
# Replace futures with Tasks in args for Task definition
new_args, new_kwargs = self.parse_args(args, kwargs)
# Create Task
task = wo.PythonFunctionTask(
name=task_name,
or callback.
"""
if self._count_deps(self.tasks[task_id]['depends']) == 0:
# We can now launch *task*
new_args, kwargs, exceptions = self.sanitize_and_wrap(task_id,
self.tasks[task_id]['args'],
self.tasks[task_id]['kwargs'])
self.tasks[task_id]['args'] = new_args
self.tasks[task_id]['kwargs'] = kwargs
if not exceptions:
# There are no dependency errors
exec_fu = None
# Acquire a lock, retest the state, launch
with self.tasks[task_id]['task_launch_lock']:
if self.tasks[task_id]['status'] == States.pending:
exec_fu = self.launch_task(
task_id, self.tasks[task_id]['func'], *new_args, **kwargs)
if exec_fu:
self.tasks[task_id]['exec_fu'] = exec_fu
try:
self.tasks[task_id]['app_fu'].update_parent(exec_fu)
self.tasks[task_id]['exec_fu'] = exec_fu
except AttributeError as e:
logger.error(
"Task {}: Caught AttributeError at update_parent".format(task_id))
raise e
else:
logger.info(
"Task {} failed due to dependency failure".format(task_id))
# Raise a dependency exception