Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_calls_failure_cleanups_on_exception(self):
submission_task = self.get_task(
ExceptionSubmissionTask, main_kwargs=self.main_kwargs)
# Add the callback to the callbacks to be invoked when the
# transfer fails.
invocations_of_cleanup = []
cleanup_callback = FunctionContainer(
invocations_of_cleanup.append, 'cleanup happened')
self.transfer_coordinator.add_failure_cleanup(cleanup_callback)
submission_task()
# Make sure the task failed to start
self.assertEqual(self.transfer_coordinator.status, 'failed')
# Make sure the cleanup was called.
self.assertEqual(invocations_of_cleanup, ['cleanup happened'])
def add_done_callback_to_future(self, future, fn, *args, **kwargs):
callback_for_future = FunctionContainer(fn, *args, **kwargs)
future.add_done_callback(callback_for_future)
def test_submission_task_announces_done_if_cancelled_before_main(self):
invocations_of_done = []
done_callback = FunctionContainer(
invocations_of_done.append, 'done announced')
self.transfer_coordinator.add_done_callback(done_callback)
self.transfer_coordinator.cancel()
submission_task = self.get_task(
NOOPSubmissionTask, main_kwargs=self.main_kwargs)
submission_task()
# Because the submission task was cancelled before being run
# it did not submit any extra tasks so a result it is responsible
# for making sure it announces done as nothing else will.
self.assertEqual(invocations_of_done, ['done announced'])
# +--submits-->SubmitMoreTasksTask
# |
# +--submits-->SuccessTask
# |
# +-->sleeps-->adds failure cleanup
#
# In the end, the failure cleanup of the SuccessTask should be ran
# when the ExecutionSubmissionTask fails. If the
# ExeceptionSubmissionTask did not run the failure cleanup it is most
# likely that it did not wait for the SuccessTask to complete, which
# it needs to because the ExeceptionSubmissionTask does not know
# what failure cleanups it needs to run until all spawned tasks have
# completed.
invocations_of_cleanup = []
event = Event()
cleanup_callback = FunctionContainer(
invocations_of_cleanup.append, 'cleanup happened')
cleanup_task = self.get_task(
SuccessTask, main_kwargs={
'callbacks': [event.set],
'failure_cleanups': [cleanup_callback]
}
)
task_for_submitting_cleanup_task = self.get_task(
SubmitMoreTasksTask, main_kwargs={
'executor': self.executor,
'tasks_to_submit': [cleanup_task]
}
)
self.main_kwargs['executor'] = self.executor
False, if not to wait and raise an error if not able to submit
a task.
:returns: The future assocaited to the submitted task
"""
semaphore = self._semaphore
# If a tag was provided, use the semaphore associated to that
# tag.
if tag:
semaphore = self._tag_semaphores[tag]
# Call acquire on the semaphore.
acquire_token = semaphore.acquire(task.transfer_id, block)
# Create a callback to invoke when task is done in order to call
# release on the semaphore.
release_callback = FunctionContainer(
semaphore.release, task.transfer_id, acquire_token)
# Submit the task to the underlying executor.
future = ExecutorFuture(self._executor.submit(task))
# Add the Semaphore.release() callback to the future such that
# it is invoked once the future completes.
future.add_done_callback(release_callback)
return future
def _get_final_io_task_submission_callback(self, download_manager,
io_executor):
final_task = download_manager.get_final_io_task()
return FunctionContainer(
self._transfer_coordinator.submit, io_executor, final_task)
def _get_final_io_task_submission_callback(self, download_manager,
io_executor):
final_task = download_manager.get_final_io_task()
return FunctionContainer(
self._transfer_coordinator.submit, io_executor, final_task)