Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
to log if it was successful...)
Returns:
- None
Updates the super() with the result() or exception()
"""
# print("[RETRY:TODO] parent_Callback for {0}".format(executor_fu))
with self._update_lock:
if not executor_fu.done():
raise ValueError("done callback called, despite future not reporting itself as done")
# this is for consistency checking
if executor_fu != self.parent:
if executor_fu.exception() is None and not isinstance(executor_fu.result(), RemoteExceptionWrapper):
# ... then we completed with a value, not an exception or wrapped exception,
# but we've got an updated executor future.
# This is bad - for example, we've started a retry even though we have a result
raise ValueError("internal consistency error: AppFuture done callback called without an exception, but parent has been changed since then")
try:
res = executor_fu.result()
if isinstance(res, RemoteExceptionWrapper):
res.reraise()
super().set_result(executor_fu.result())
except Exception as e:
if executor_fu.retries_left > 0:
# ignore this exception, because assume some later
# parent executor, started external to this class,
logger.debug("Synced")
task_request = b'TREQ'
while True:
comm.send(task_request, dest=0, tag=TASK_REQUEST_TAG)
# The worker will receive {'task_id':, 'buffer':}
req = comm.recv(source=0, tag=rank)
logger.debug("Got req: {}".format(req))
tid = req['task_id']
logger.debug("Got task: {}".format(tid))
try:
result = execute_task(req['buffer'])
except Exception as e:
result_package = {'task_id': tid, 'exception': serialize_object(RemoteExceptionWrapper(*sys.exc_info()))}
logger.debug("No result due to exception: {} with result package {}".format(e, result_package))
else:
result_package = {'task_id': tid, 'result': serialize_object(result)}
logger.debug("Result: {}".format(result))
pkl_package = pickle.dumps(result_package)
comm.send(pkl_package, dest=0, tag=RESULT_TAG)
if not executor_fu.done():
raise ValueError("done callback called, despite future not reporting itself as done")
# this is for consistency checking
if executor_fu != self.parent:
if executor_fu.exception() is None and not isinstance(executor_fu.result(), RemoteExceptionWrapper):
# ... then we completed with a value, not an exception or wrapped exception,
# but we've got an updated executor future.
# This is bad - for example, we've started a retry even though we have a result
raise ValueError("internal consistency error: AppFuture done callback called without an exception, but parent has been changed since then")
try:
res = executor_fu.result()
if isinstance(res, RemoteExceptionWrapper):
res.reraise()
super().set_result(executor_fu.result())
except Exception as e:
if executor_fu.retries_left > 0:
# ignore this exception, because assume some later
# parent executor, started external to this class,
# will provide the answer
pass
else:
super().set_exception(e)
def wrapper(*args, **kwargs):
import sys
from parsl.app.errors import RemoteExceptionWrapper
try:
return func(*args, **kwargs)
except Exception:
return RemoteExceptionWrapper(*sys.exc_info())
return wrapper
def get_result(self, task_id, block=False):
if task_id not in self._pending and task_id not in self._results:
print('Pending:', self._pending.keys())
raise ValueError('Unknown task id {}'.format(task_id))
if block:
while task_id not in self._results:
continue
if task_id in self._results:
res = self._results[task_id]
del self._results[task_id]
if isinstance(res, RemoteExceptionWrapper):
res.reraise()
else:
return res
elif task_id in self._completed_tasks:
raise Exception("Task result already returned")
else:
raise Exception("Task pending")
# this is raised in the main context.
for task in self.tasks:
self.tasks[task].set_exception(self._executor_exception)
break
task_fut = self.tasks[tid]
if 'result' in msg:
result, _ = deserialize_object(msg['result'])
task_fut.set_result(result)
elif 'exception' in msg:
try:
s, _ = deserialize_object(msg['exception'])
# s should be a RemoteExceptionWrapper... so we can reraise it
if isinstance(s, RemoteExceptionWrapper):
try:
s.reraise()
except Exception as e:
task_fut.set_exception(e)
elif isinstance(s, Exception):
task_fut.set_exception(s)
else:
raise ValueError("Unknown exception-like type received: {}".format(type(s)))
except Exception as e:
# TODO could be a proper wrapped exception?
task_fut.set_exception(
DeserializationError("Received exception, but handling also threw an exception: {}".format(e)))
else:
raise BadMessage("Message received is neither result or exception")
if not self.is_alive:
def handle_exec_update(self, task_id, future):
"""This function is called only as a callback from an execution
attempt reaching a final state (either successfully or failing).
It will launch retries if necessary, and update the task
structure.
Args:
task_id (string) : Task id which is a uuid string
future (Future) : The future object corresponding to the task which
makes this callback
"""
try:
res = future.result()
if isinstance(res, RemoteExceptionWrapper):
res.reraise()
except Exception as e:
logger.debug("Task {} failed".format(task_id))
# We keep the history separately, since the future itself could be
# tossed.
self.tasks[task_id]['fail_history'].append(str(e))
self.tasks[task_id]['fail_count'] += 1
if not self._config.lazy_errors:
logger.exception("Eager fail, skipping retry logic")
self.tasks[task_id]['status'] = States.failed
if self.monitoring:
task_log_info = self._create_task_log_info(task_id, 'eager')
self.monitoring.send(MessageType.TASK_INFO, task_log_info)
return
while True:
# This task receiver socket is blocking.
b_task_id, *buf = funcx_worker_socket.recv_multipart()
# msg = task_socket.recv_pyobj()
logger.debug("Got buffer : {}".format(buf))
task_id = int.from_bytes(b_task_id, "little")
logger.info("Received task {}".format(task_id))
try:
result = execute_task(buf)
serialized_result = serialize_object(result)
except Exception:
result_package = {'task_id': task_id, 'exception': serialize_object(RemoteExceptionWrapper(*sys.exc_info()))}
logger.debug("Got exception something")
else:
result_package = {'task_id': task_id, 'result': serialized_result}
logger.info("Completed task {}".format(task_id))
pkl_package = pickle.dumps(result_package)
funcx_worker_socket.send_multipart([pkl_package])
if no_reuse:
break
# Failed task
if received is False:
reason = item["reason"]
status = item["status"]
future.set_exception(AppFailure(reason, status))
# Successful task
else:
result = item["result"]
future_update = result["result"]
logger.debug("Updating Future for Parsl Task {}".format(parsl_tid))
if result["failure"] is False:
future.set_result(future_update)
else:
future_fail = pickle.loads(future_update)
exc = RemoteExceptionWrapper(*future_fail)
try:
exc.reraise()
except Exception as e:
future.set_exception(e)
logger.debug("Exiting Collector Thread")
return