How to use the aiorwlock.current_task function in aiorwlock

To help you get started, we’ve selected a few aiorwlock examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aio-libs / aiorwlock / tests / test_corner_cases.py View on Github external
def should_fail(timeout, loop):
    task = current_task(loop)

    handle = loop.call_later(timeout, task.cancel)
    try:
        yield
    except asyncio.CancelledError:
        handle.cancel()
        return
    else:
        msg = 'Inner task expected to be cancelled: {}'.format(task)
        pytest.fail(msg)
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
async def task():
            tid = current_task(loop=self._loop)
            self.started.append(tid)
            try:
                await f()
            finally:
                self.finished.append(tid)
                while not self._can_exit:
                    await asyncio.sleep(0.01, loop=self._loop)
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
def test_current_task(loop):
    with pytest.raises(RuntimeError):
        current_task(loop=loop)

    with pytest.raises(RuntimeError):
        current_task()
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
def test_current_task(loop):
    with pytest.raises(RuntimeError):
        current_task(loop=loop)

    with pytest.raises(RuntimeError):
        current_task()

aiorwlock

Read write lock for asyncio.

Apache-2.0
Latest version published 27 days ago

Package Health Score

85 / 100
Full package analysis