Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_thread_pool_error_callback(self):
"""Thread Pool errors are forwarded to callback."""
with ThreadPool(max_workers=1) as pool:
future = pool.schedule(error_function)
future.add_done_callback(self.callback)
self.event.wait()
self.assertTrue(isinstance(self.exception, Exception))
def test_thread_pool_callback(self):
"""Thread Pool results are forwarded to the callback."""
with ThreadPool(max_workers=1) as pool:
future = pool.schedule(
function, args=[1], kwargs={'keyword_argument': 1})
future.add_done_callback(self.callback)
self.event.wait()
self.assertEqual(self.results, 2)
def test_thread_pool_multiple_futures(self):
"""Thread Pool multiple futures."""
futures = []
with ThreadPool(max_workers=1) as pool:
for _ in range(5):
futures.append(pool.schedule(function, args=[1]))
self.assertEqual(sum([t.result() for t in futures]), 5)
def test_thread_pool_schedule_id(self):
"""ThreadPool task ID is forwarded to it."""
with ThreadPool() as tp:
task = tp.schedule(jp, args=(1, ), identifier='foo')
self.assertEqual(task.id, 'foo')
def test_thread_pool_close(self):
"""ThreadPool is closed consuming all tasks."""
tp = ThreadPool()
for i in range(0, 10):
tp.schedule(jp, args=(1, ))
tp.close()
tp.join()
self.assertTrue(tp._queue.qsize() <= 1)
def test_thread_pool_different_thread(self):
"""Thread Pool multiple futures are handled by different threades."""
futures = []
with ThreadPool(max_workers=2) as pool:
for _ in range(0, 5):
futures.append(pool.schedule(tid_function))
self.assertEqual(len(set([t.result() for t in futures])), 2)
def test_thread_pool_stop_stopped_function(self):
"""Thread Pool is stopped in function."""
with ThreadPool(max_workers=1) as pool:
def function():
pool.stop()
pool.schedule(function)
self.assertFalse(pool.active)
def test_thread_pool_map(self):
"""Thread Pool map simple."""
elements = [1, 2, 3]
with ThreadPool(max_workers=1) as pool:
future = pool.map(function, elements)
generator = future.result()
self.assertEqual(list(generator), elements)
def test_thread_pool_map_zero_chunk(self):
"""Thread Pool map chunksize 0."""
with ThreadPool(max_workers=1) as pool:
with self.assertRaises(ValueError):
pool.map(function, [], chunksize=0)
def test_thread_pool_schedule_uuid(self):
"""ThreadPool task UUID is assigned if None."""
with ThreadPool() as tp:
task = tp.schedule(jp, args=(1, ))
self.assertEqual(task.id.version, 4)