How to use the uvloop._testbase function in uvloop

To help you get started, we’ve selected a few uvloop examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github MagicStack / uvloop / tests / test_executors.py View on Github external
    @unittest.skipIf(
        multiprocessing.get_start_method(False) == 'spawn',
        'no need to test on macOS where spawn is used instead of fork')
    def test_executors_process_pool_01(self):
        self.run_pool_test(concurrent.futures.ProcessPoolExecutor)

    def test_executors_process_pool_02(self):
        self.run_pool_test(concurrent.futures.ThreadPoolExecutor)


class TestUVExecutors(_TestExecutors, tb.UVTestCase):
    pass


class TestAIOExecutors(_TestExecutors, tb.AIOTestCase):
    pass
github MagicStack / uvloop / tests / test_regr1.py View on Github external
loop.run_until_complete(server.wait_closed())
        try:
            loop.close()
        except Exception as exc:
            print(exc)
        qout.put('stopped')

    thread = threading.Thread(target=server_thread, daemon=True)
    thread.start()

    quin.get()
    server_loop.call_soon_threadsafe(server_loop.stop)
    thread.join(1)


class TestIssue39Regr(tb.UVTestCase):
    """See https://github.com/MagicStack/uvloop/issues/39 for details.

    Original code to reproduce the bug is by Jim Fulton.
    """

    def on_alarm(self, sig, fr):
        if self.running:
            raise FailedTestError

    def run_test(self):
        try:
            for i in range(10):
                for threaded in [True, False]:
                    if threaded:
                        qin, qout = queue.Queue(), queue.Queue()
                        threading.Thread(
github MagicStack / uvloop / tests / test_dns.py View on Github external
self._test_getnameinfo(('127.0.0.1', 80), 0)

    def test_getnameinfo_2(self):
        self._test_getnameinfo(('127.0.0.1', 80, 1231231231213), 0)

    def test_getnameinfo_3(self):
        self._test_getnameinfo(('127.0.0.1', 80, 0, 0), 0)

    def test_getnameinfo_4(self):
        self._test_getnameinfo(('::1', 80), 0)

    def test_getnameinfo_5(self):
        self._test_getnameinfo(('localhost', 8080), 0)


class Test_UV_DNS(BaseTestDNS, tb.UVTestCase):

    def test_getaddrinfo_close_loop(self):
        # Test that we can close the loop with a running
        # DNS query.

        try:
            # Check that we have internet connection
            socket.getaddrinfo('example.com', 80)
        except socket.error:
            raise unittest.SkipTest

        async def run():
            fut = self.loop.create_task(
                self.loop.getaddrinfo('example.com', 80))
            await asyncio.sleep(0)
            fut.cancel()
github MagicStack / uvloop / tests / test_pipes.py View on Github external
data = bytearray()

        def reader(data):
            try:
                chunk = os.read(rpipe, 1024)
            except BlockingIOError:
                return len(data)
            data += chunk
            return len(data)

        tb.run_until(self.loop, lambda: reader(data) >= 1)
        self.assertEqual(b'1', data)

        transport.write(b'2345')
        tb.run_until(self.loop, lambda: reader(data) >= 5)
        self.assertEqual(b'12345', data)
        self.assertEqual('CONNECTED', proto.state)

        os.close(rpipe)

        # extra info is available
        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))

        # close connection
        proto.transport.close()
        self.loop.run_until_complete(proto.done)
        self.assertEqual('CLOSED', proto.state)
github MagicStack / uvloop / tests / test_sockets.py View on Github external
    @unittest.skipUnless(tb.has_IPv6, 'no IPv6')
    def test_socket_ipv6_addr(self):
        server_sock = socket.socket(socket.AF_INET6)
        with server_sock:
            server_sock.bind(('::1', 0))

            addr = server_sock.getsockname()  # tuple of 4 elements for IPv6

            async def run():
                sock = socket.socket(socket.AF_INET6)
                with sock:
                    sock.setblocking(False)
                    # Check that sock_connect accepts 4-element address tuple
                    # for IPv6 sockets.
                    f = self.loop.sock_connect(sock, addr)
                    try:
                        await asyncio.wait_for(f, timeout=0.1)