Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_process_pool_timeout_callback(self):
"""Process Pool Spawn TimeoutError is forwarded to callback."""
with ProcessPool(max_workers=1) as pool:
future = pool.schedule(long_function, timeout=0.1)
future.add_done_callback(self.callback)
self.event.wait()
self.assertTrue(isinstance(self.exception, TimeoutError))
def test_process_pool_callback(self):
"""Process Pool Forkserver result is forwarded to the callback."""
with ProcessPool(max_workers=1) as pool:
future = pool.schedule(
function, args=[1], kwargs={'keyword_argument': 1})
future.add_done_callback(self.callback)
self.event.wait()
self.assertEqual(self.result, 2)
def test_process_pool_future_limit(self):
"""Process Pool Forkserver tasks limit is honored."""
futures = []
with ProcessPool(max_workers=1, max_tasks=2) as pool:
for _ in range(0, 4):
futures.append(pool.schedule(pid_function))
self.assertEqual(len(set([f.result() for f in futures])), 2)
def test_thread_pool_error_callback(self):
"""Thread Pool errors are forwarded to callback."""
with ThreadPool(max_workers=1) as pool:
future = pool.schedule(error_function)
future.add_done_callback(self.callback)
self.event.wait()
self.assertTrue(isinstance(self.exception, Exception))
def test_thread_pool_callback(self):
"""Thread Pool results are forwarded to the callback."""
with ThreadPool(max_workers=1) as pool:
future = pool.schedule(
function, args=[1], kwargs={'keyword_argument': 1})
future.add_done_callback(self.callback)
self.event.wait()
self.assertEqual(self.results, 2)
def test_process_pool_task_limit(self):
"""Process Pool task limit is honored."""
tasks = []
with process.Pool(task_limit=2) as pool:
for i in range(0, 4):
tasks.append(pool.schedule(pid_function))
self.assertEqual(len(set([t.get() for t in tasks])), 2)
@process.concurrent
def critical_decorated():
os._exit(123)
def test_undecorated_callback(self):
"""Process Concurrent undecorated results are forwarded to callback."""
task = process.concurrent(target=undecorated, args=[1],
kwargs={'keyword_argument': 1},
callback=self.callback)
event.wait()
self.assertEqual(task.get(), 2)
def test_cancel_decorated_callback(self):
"""Process Concurrent TaskCancelled is forwarded to callback."""
task = long_decorated_callback()
task.cancel()
event.wait()
self.assertTrue(isinstance(exception, TaskCancelled))
def test_thread_pool_cancel(self):
"""ThreadPoolDecorator callback gets notification if Task is cancelled."""
tjob_pool_long.callback = self.error_callback
task = tjob_pool_long(1, 1)
task.cancel()
event.wait()
self.assertTrue(isinstance(self.exception, TaskCancelled))