Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_waitforqueues_multiple(self):
"""Waitforqueues waits for multiple queues."""
for index in range(3):
launch_thread(None, queue_function, True, self.queues, index, 0.01)
time.sleep(0.1)
self.assertEqual(list(waitforqueues(self.queues)), self.queues)
def test_waitforthreads_restore(self):
"""Waitforthreads get_ident is restored to original one."""
if hasattr(threading, 'get_ident'):
expected = threading.get_ident
else:
expected = threading._get_ident
thread = launch_thread(None, thread_function, True, 0)
time.sleep(0.01)
waitforthreads([thread])
if hasattr(threading, 'get_ident'):
self.assertEqual(threading.get_ident, expected)
else:
self.assertEqual(threading._get_ident, expected)
def test_waitforqueues_single(self):
"""Waitforqueues waits for a single queue."""
launch_thread(None, queue_function, True, self.queues, 0, 0.01)
self.assertEqual(list(waitforqueues(self.queues))[0], self.queues[0])
def test_waitforthreads_multiple(self):
"""Waitforthreads waits for multiple threads."""
threads = []
for _ in range(5):
threads.append(launch_thread(None, thread_function, True, 0.01))
time.sleep(0.1)
self.assertEqual(list(waitforthreads(threads)), threads)
def test_waitforqueues_timeout(self):
"""Waitforqueues returns empty list if timeout."""
launch_thread(None, queue_function, True, self.queues, 0, 0.1)
self.assertEqual(list(waitforqueues(self.queues, timeout=0.01)), [])
def wrapper(*args, **kwargs):
future = Future()
launch_thread(name, _function_handler, daemon, function, args, kwargs, future)
return future
def _start_pool(self):
with self._context.state_mutex:
if self._context.state == CREATED:
self._pool_manager.start()
self._loops = (launch_thread(None, task_scheduler_loop,
True, self._pool_manager),
launch_thread(None, pool_manager_loop,
True, self._pool_manager),
launch_thread(None, message_manager_loop,
True, self._pool_manager))
self._context.state = RUNNING
def _start_pool(self):
self._pool_manager.start()
self._loops = (launch_thread(task_scheduler_loop, self._pool_manager),
launch_thread(pool_manager_loop, self._pool_manager),
launch_thread(message_manager_loop, self._pool_manager))
self._context.state = RUNNING
def create_workers(self):
for _ in range(self.context.workers - len(self.workers)):
worker = launch_thread(None, worker_thread, True, self.context)
self.workers.append(worker)
def _start_pool(self):
with self._context.state_mutex:
if self._context.state == CREATED:
self._pool_manager.start()
self._loops = (launch_thread(None, pool_manager_loop,
True, self._pool_manager),)
self._context.state = RUNNING