Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@thread.concurrent(callback=callback)
def error_decorated_callback():
raise Exception("BOOM!")
@thread(callback, 5)
def wrong(argument, keyword_argument=0):
return argument + keyword_argument
except Exception as error:
@thread.spawn(name='foo')
def decorated(queue, argument, keyword_argument=0):
"""A docstring."""
queue.put(argument + keyword_argument)
@thread
def tjob_long():
time.sleep(1)
return 1
def test_thread_pool_queue_factory(self):
"""Thread Pool queue factory is called."""
with thread.Pool(queue_factory=queue_factory) as pool:
self.assertEqual(pool._context.task_queue.maxsize, 5)
@thread(callback=callback)
def tjob_callback(argument, keyword_argument=0):
return argument + keyword_argument
@thread
def tjob_count():
return None
@thread.spawn(name='pool_manager')
def pool_manager_loop(pool):
while pool.alive:
reset_expired_workers(pool)
manage_tasks(pool)
time.sleep(0.1)