How to use the aiomisc.thread_pool.ThreadPoolExecutor function in aiomisc

To help you get started, we’ve selected a few aiomisc examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aiokitchen / aiomisc / aiomisc_pytest / pytest_plugin.py View on Github external
def thread_pool_executor():
    from aiomisc.thread_pool import ThreadPoolExecutor
    return ThreadPoolExecutor
github aiokitchen / aiomisc / aiomisc / utils.py View on Github external
def create_default_event_loop(pool_size=None, policy=event_loop_policy,
                              debug=False):
    try:
        asyncio.get_event_loop().close()
    except RuntimeError:
        pass  # event loop is not created yet

    asyncio.set_event_loop_policy(policy)

    loop = asyncio.new_event_loop()
    loop.set_debug(debug)
    asyncio.set_event_loop(loop)

    pool_size = pool_size or cpu_count()
    thread_pool = ThreadPoolExecutor(pool_size)
    loop.set_default_executor(thread_pool)

    return loop, thread_pool