Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'Python version must be < 3.7')
def test_transport_unclosed_warning(self):
async def test(sock):
return await self.loop.create_unix_connection(
asyncio.Protocol,
None,
sock=sock)
with self.assertWarnsRegex(ResourceWarning, 'unclosed'):
s1, s2 = socket.socketpair(socket.AF_UNIX)
with s1, s2:
self.loop.run_until_complete(test(s1))
self.loop.close()
class Test_UV_Unix(_TestUnix, tb.UVTestCase):
@unittest.skipUnless(hasattr(os, 'fspath'), 'no os.fspath()')
def test_create_unix_connection_pathlib(self):
async def run(addr):
t, _ = await self.loop.create_unix_connection(
asyncio.Protocol, addr)
t.close()
with self.unix_server(lambda sock: time.sleep(0.01)) as srv:
addr = pathlib.Path(srv.addr)
self.loop.run_until_complete(run(addr))
@unittest.skipUnless(hasattr(os, 'fspath'), 'no os.fspath()')
def test_create_unix_server_pathlib(self):
with self.unix_sock_name() as srv_path:
srv_path = pathlib.Path(srv_path)
import asyncio
import subprocess
import sys
from uvloop import _testbase as tb
class TestDealloc(tb.UVTestCase):
def test_dealloc_1(self):
# Somewhere between Cython 0.25.2 and 0.26.0 uvloop programs
# started to trigger the following output:
#
# $ python prog.py
# Error in sys.excepthook:
#
# Original exception was:
#
# Upon some debugging, it appeared that Handle.__dealloc__ was
# called at a time where some CPython objects become non-functional,
# and any exception in __dealloc__ caused CPython to output the
# above.
#
# This regression test starts an event loop in debug mode,
"""
proc = await asyncio.create_subprocess_exec(
sys.executable, b'-W', b'ignore', b'-c', PROG,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = await proc.communicate()
self.assertEqual(err, b'')
self.assertIn(b'True', out)
self.loop.run_until_complete(runner())
class Test_UV_Signals(_TestSignal, tb.UVTestCase):
NEW_LOOP = 'uvloop.new_event_loop()'
def test_signals_no_SIGCHLD(self):
with self.assertRaisesRegex(RuntimeError,
r"cannot add.*handler.*SIGCHLD"):
self.loop.add_signal_handler(signal.SIGCHLD, lambda *a: None)
@unittest.skipIf(sys.version_info[:3] >= (3, 8, 0),
'in 3.8 a ThreadedChildWatcher is used '
'(does not rely on SIGCHLD)')
def test_asyncio_add_watcher_SIGCHLD_nop(self):
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
asyncio.get_event_loop_policy().get_child_watcher()
try:
import asyncio
from uvloop._testbase import UVTestCase
class TestCythonIntegration(UVTestCase):
def test_cython_coro_is_coroutine(self):
from uvloop.loop import _test_coroutine_1
from asyncio.coroutines import _format_coroutine
coro = _test_coroutine_1()
self.assertTrue(
_format_coroutine(coro).startswith('_test_coroutine_1() done'))
self.assertEqual(_test_coroutine_1.__qualname__, '_test_coroutine_1')
self.assertEqual(_test_coroutine_1.__name__, '_test_coroutine_1')
self.assertTrue(asyncio.iscoroutine(coro))
fut = asyncio.ensure_future(coro)
self.assertTrue(isinstance(fut, asyncio.Future))
self.assertTrue(isinstance(fut, asyncio.Task))
fut.cancel()
async def stop():
await asyncio.sleep(0.1)
try:
await asyncio.wait_for(runner.cleanup(), timeout=0.1)
finally:
try:
client_task.cancel()
await client_task
except asyncio.CancelledError:
pass
self.loop.run_until_complete(stop())
@unittest.skipIf(skip_tests, "no aiohttp module")
class Test_UV_AioHTTP(_TestAioHTTP, tb.UVTestCase):
pass
@unittest.skipIf(skip_tests, "no aiohttp module")
class Test_AIO_AioHTTP(_TestAioHTTP, tb.AIOTestCase):
pass
def create_future(self):
return self.loop.create_future()
def create_task(self, coro):
return self.loop.create_task(coro)
class Test_UV_UV_Tasks_AIO_Future(_TestTasks, tb.UVTestCase):
def create_future(self):
return asyncio.Future()
def create_task(self, coro):
return self.loop.create_task(coro)
class Test_UV_AIO_Tasks(_TestTasks, tb.UVTestCase):
def create_future(self):
return asyncio.Future()
def create_task(self, coro):
return asyncio.Task(coro)
class Test_AIO_Tasks(_TestTasks, tb.AIOTestCase):
def create_future(self):
return asyncio.Future()
def create_task(self, coro):
return asyncio.Task(coro)
async def sub():
cvar.set(tracked) # NoQA
self.loop.call_soon(lambda: None)
async def main():
await self.loop.create_task(sub())
await asyncio.sleep(0.01)
task = self.loop.create_task(main())
self.loop.run_until_complete(task)
del tracked
self.assertIsNone(ref())
class Test_UV_Context(_ContextBaseTests, tb.UVTestCase):
@unittest.skipIf(PY37, 'requires Python <3.6')
def test_context_arg(self):
def cb():
pass
with self.assertRaisesRegex(NotImplementedError,
'requires Python 3.7'):
self.loop.call_soon(cb, context=1)
with self.assertRaisesRegex(NotImplementedError,
'requires Python 3.7'):
self.loop.call_soon_threadsafe(cb, context=1)
with self.assertRaisesRegex(NotImplementedError,
'requires Python 3.7'):
# server can send the data in a random time, even before
# the previous result future has cancelled.
await self.loop.sock_sendall(srv_sock_conn, b'1')
for rfut in pending_read_futs:
rfut.cancel()
data = await self.loop.sock_recv(sock_client, 1)
self.assertEqual(data, b'1')
self.loop.run_until_complete(server())
class TestUVSockets(_TestSockets, tb.UVTestCase):
@unittest.skipUnless(hasattr(select, 'epoll'), 'Linux only test')
def test_socket_sync_remove(self):
# See https://github.com/MagicStack/uvloop/issues/61 for details
sock = socket.socket()
epoll = select.epoll.fromfd(self.loop._get_backend_id())
try:
cb = lambda: None
sock.bind(('127.0.0.1', 0))
sock.listen(0)
fd = sock.fileno()
self.loop.add_reader(fd, cb)
self.loop.run_until_complete(asyncio.sleep(0.01))
###############################################################################
class Test_UV_UV_create_future(_TestFutures, tb.UVTestCase):
# Test uvloop.Loop.create_future
def create_future(self):
return self.loop.create_future()
class Test_UV_UV_Future(_TestFutures, tb.UVTestCase):
# Test that uvloop.Future can be instantiated directly
def create_future(self):
return uvloop.Future(loop=self.loop)
class Test_UV_AIO_Futures(_TestFutures, tb.UVTestCase):
def create_future(self):
return asyncio.Future(loop=self.loop)
class Test_AIO_Futures(_TestFutures, tb.AIOTestCase):
def create_future(self):
return asyncio.Future(loop=self.loop)
class Test_UV_UV_FuturesCallbacks(_TestFuturesDoneCallbacks, tb.UVTestCase):
def _new_future(self):
return self.loop.create_future()
class Test_UV_AIO_FuturesCallbacks(_TestFuturesDoneCallbacks, tb.UVTestCase):
def _new_future(self):
# method.
await waiter().asend(None)
self.loop.run_until_complete(foo())
self.loop.run_until_complete(asyncio.sleep(0.01))
def test_inf_wait_for(self):
async def foo():
await asyncio.sleep(0.1)
return 123
res = self.loop.run_until_complete(
asyncio.wait_for(foo(), timeout=float('inf')))
self.assertEqual(res, 123)
class TestBaseUV(_TestBase, UVTestCase):
def test_loop_create_future(self):
fut = self.loop.create_future()
self.assertTrue(isinstance(fut, asyncio.Future))
self.assertIs(fut._loop, self.loop)
fut.cancel()
def test_loop_call_soon_handle_cancelled(self):
cb = lambda: False # NoQA
handle = self.loop.call_soon(cb)
self.assertFalse(handle.cancelled())
handle.cancel()
self.assertTrue(handle.cancelled())
handle = self.loop.call_soon(cb)
self.assertFalse(handle.cancelled())