How to use the uvloop._testbase.AIOTestCase function in uvloop

To help you get started, we’ve selected a few uvloop examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github MagicStack / uvloop / tests / test_process.py View on Github external
with tempfile.NamedTemporaryFile('w') as stderr:
                with contextlib.redirect_stdout(stdout):
                    with contextlib.redirect_stderr(stderr):
                        self.loop.run_until_complete(test())

                stdout.flush()
                stderr.flush()

                with open(stdout.name, 'rb') as so:
                    self.assertEqual(so.read(), b'out\n')

                with open(stderr.name, 'rb') as se:
                    self.assertEqual(se.read(), b'err\n')


class Test_AIO_Process(_TestProcess, tb.AIOTestCase):
    pass


class TestAsyncio_UV_Process(_AsyncioTests, tb.UVTestCase):
    pass


class TestAsyncio_AIO_Process(_AsyncioTests, tb.AIOTestCase):
    pass


class Test_UV_Process_Delayed(tb.UVTestCase):

    class TestProto:
        def __init__(self):
            self.lost = 0
github MagicStack / uvloop / tests / test_sockets.py View on Github external
self.loop.remove_writer(s.fileno())
            s.close()
            self.assertEqual(s.fileno(), -1)

        s = socket.socket()
        with s:
            s.setblocking(False)
            self.loop.add_writer(s, lambda: None)
            self.loop.add_writer(s, lambda: None)
            self.loop.add_writer(s, lambda: None)
            self.loop.remove_writer(s)
            s.close()
            self.assertEqual(s.fileno(), -1)


class TestAIOSockets(_TestSockets, tb.AIOTestCase):
    pass
github MagicStack / uvloop / tests / test_unix.py View on Github external
for _ in range(TOTAL_CNT):
                    tasks.append(coro(srv.addr))

                self.loop.run_until_complete(asyncio.gather(*tasks))

            self.assertEqual(CNT, TOTAL_CNT)

        with self._silence_eof_received_warning():
            run(client)


class Test_UV_UnixSSL(_TestSSL, tb.UVTestCase):
    pass


class Test_AIO_UnixSSL(_TestSSL, tb.AIOTestCase):
    pass
github MagicStack / uvloop / tests / test_tasks.py View on Github external
def create_future(self):
        return asyncio.Future()

    def create_task(self, coro):
        return self.loop.create_task(coro)


class Test_UV_AIO_Tasks(_TestTasks, tb.UVTestCase):
    def create_future(self):
        return asyncio.Future()

    def create_task(self, coro):
        return asyncio.Task(coro)


class Test_AIO_Tasks(_TestTasks, tb.AIOTestCase):
    def create_future(self):
        return asyncio.Future()

    def create_task(self, coro):
        return asyncio.Task(coro)
github MagicStack / uvloop / tests / test_process.py View on Github external
with open(stdout.name, 'rb') as so:
                    self.assertEqual(so.read(), b'out\n')

                with open(stderr.name, 'rb') as se:
                    self.assertEqual(se.read(), b'err\n')


class Test_AIO_Process(_TestProcess, tb.AIOTestCase):
    pass


class TestAsyncio_UV_Process(_AsyncioTests, tb.UVTestCase):
    pass


class TestAsyncio_AIO_Process(_AsyncioTests, tb.AIOTestCase):
    pass


class Test_UV_Process_Delayed(tb.UVTestCase):

    class TestProto:
        def __init__(self):
            self.lost = 0
            self.stages = []

        def connection_made(self, transport):
            self.stages.append(('CM', transport))

        def pipe_data_received(self, fd, data):
            if fd == 1:
                self.stages.append(('STDOUT', data))
github MagicStack / uvloop / tests / test_context.py View on Github external
pass

        with self.assertRaisesRegex(NotImplementedError,
                                    'requires Python 3.7'):
            self.loop.call_soon(cb, context=1)

        with self.assertRaisesRegex(NotImplementedError,
                                    'requires Python 3.7'):
            self.loop.call_soon_threadsafe(cb, context=1)

        with self.assertRaisesRegex(NotImplementedError,
                                    'requires Python 3.7'):
            self.loop.call_later(0.1, cb, context=1)


class Test_AIO_Context(_ContextBaseTests, tb.AIOTestCase):
    pass
github MagicStack / uvloop / tests / test_signals.py View on Github external
def test_asyncio_add_watcher_SIGCHLD_nop(self):
        asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
        asyncio.get_event_loop_policy().get_child_watcher()

        try:
            loop = uvloop.new_event_loop()
            with self.assertWarnsRegex(
                    RuntimeWarning,
                    "asyncio is trying to install its ChildWatcher"):
                asyncio.set_event_loop(loop)
        finally:
            asyncio.set_event_loop(None)
            loop.close()


class Test_AIO_Signals(_TestSignal, tb.AIOTestCase):
    NEW_LOOP = 'asyncio.new_event_loop()'
github MagicStack / uvloop / tests / test_dns.py View on Github external
raise unittest.SkipTest

        async def run():
            fut = self.loop.create_task(
                self.loop.getaddrinfo('example.com', 80))
            await asyncio.sleep(0)
            fut.cancel()
            self.loop.stop()

        try:
            self.loop.run_until_complete(run())
        finally:
            self.loop.close()


class Test_AIO_DNS(BaseTestDNS, tb.AIOTestCase):
    pass
github MagicStack / uvloop / tests / test_futures.py View on Github external
def create_future(self):
        return self.loop.create_future()


class Test_UV_UV_Future(_TestFutures, tb.UVTestCase):
    # Test that uvloop.Future can be instantiated directly
    def create_future(self):
        return uvloop.Future(loop=self.loop)


class Test_UV_AIO_Futures(_TestFutures, tb.UVTestCase):
    def create_future(self):
        return asyncio.Future(loop=self.loop)


class Test_AIO_Futures(_TestFutures, tb.AIOTestCase):
    def create_future(self):
        return asyncio.Future(loop=self.loop)


class Test_UV_UV_FuturesCallbacks(_TestFuturesDoneCallbacks, tb.UVTestCase):
    def _new_future(self):
        return self.loop.create_future()


class Test_UV_AIO_FuturesCallbacks(_TestFuturesDoneCallbacks, tb.UVTestCase):
    def _new_future(self):
        return asyncio.Future(loop=self.loop)


class Test_AIO_FuturesCallbacks(_TestFuturesDoneCallbacks, tb.AIOTestCase):
    def _new_future(self):
github MagicStack / uvloop / tests / test_futures.py View on Github external
class Test_AIO_Futures(_TestFutures, tb.AIOTestCase):
    def create_future(self):
        return asyncio.Future(loop=self.loop)


class Test_UV_UV_FuturesCallbacks(_TestFuturesDoneCallbacks, tb.UVTestCase):
    def _new_future(self):
        return self.loop.create_future()


class Test_UV_AIO_FuturesCallbacks(_TestFuturesDoneCallbacks, tb.UVTestCase):
    def _new_future(self):
        return asyncio.Future(loop=self.loop)


class Test_AIO_FuturesCallbacks(_TestFuturesDoneCallbacks, tb.AIOTestCase):
    def _new_future(self):
        return asyncio.Future(loop=self.loop)