Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def thread_pool_executor():
from aiomisc.thread_pool import ThreadPoolExecutor
return ThreadPoolExecutor
def create_default_event_loop(pool_size=None, policy=event_loop_policy,
debug=False):
try:
asyncio.get_event_loop().close()
except RuntimeError:
pass # event loop is not created yet
asyncio.set_event_loop_policy(policy)
loop = asyncio.new_event_loop()
loop.set_debug(debug)
asyncio.set_event_loop(loop)
pool_size = pool_size or cpu_count()
thread_pool = ThreadPoolExecutor(pool_size)
loop.set_default_executor(thread_pool)
return loop, thread_pool