Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _wait_for_task(task, channel, event, timeout):
"""Waits for task to complete, timeout or be cancelled.
Blocks until one of the events happen.
Returns the task.
"""
results = None
if event.wait(timeout=timeout):
if not task.cancelled:
results = channel.recv()
else:
results = TaskCancelled('Task cancelled')
else:
results = TimeoutError('Task timeout')
return results
Starts a new worker, waits for the *Task* to be performed,
collects results, runs the callback and cleans up the process.
"""
queue = task._queue
function = task._function
timeout = task.timeout > 0 and task.timeout or None
process = task_worker(queue, function, task._args, task._kwargs)
try:
results = queue.get(timeout)
task._set(results)
except Empty:
task._set(TimeoutError('Task Timeout', timeout))
process.terminate()
process.join()
if task._callback is not None:
try:
task._callback(task)
except Exception:
print_exc()
If *timeout* is greater than 0,
it block until all workers exited or raise TimeoutError.
"""
if self._context.state == RUNNING:
raise RuntimeError('The Pool is still running')
self._connection.close()
if timeout > 0:
# wait for Pool processes
self._context.pool = self._join_workers(timeout)
# verify timeout expired
if len(self._context.pool) > 0:
raise TimeoutError('Workers are still running')
else:
self._context.pool = self._join_workers()
def recv(timeout=None):
with self.rlock:
if self.reader.poll(timeout):
return self.reader.recv()
else:
raise TimeoutError("Channel timeout")
def timeout_tasks(self, workers):
queue = self.context.queue
workers_event = self.context.workers_event
for worker in workers:
task = worker.get_current()
worker.stop()
workers_event.set()
task._set(TimeoutError('Task timeout'))
if task._callback is not None:
self.task_callback(task)
queue.task_done()