Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test():
port = tb.find_free_port()
srv = await self.loop.create_server(Proto, '127.0.0.1', port)
wsrv = weakref.ref(srv)
del srv
gc.collect()
gc.collect()
gc.collect()
s = socket.socket(socket.AF_INET)
with s:
s.setblocking(False)
await self.loop.sock_connect(s, ('127.0.0.1', port))
d = await self.loop.sock_recv(s, 100)
self.assertEqual(d, b'hello')
srv = wsrv()
async def test():
port = tb.find_free_port()
srv = await self.loop.create_server(Proto, '127.0.0.1', port)
s = socket.socket(socket.AF_INET)
with s:
s.setblocking(False)
await self.loop.sock_connect(s, ('127.0.0.1', port))
await self.loop.sock_sendall(s, b'a' * SIZE)
d = await self.loop.sock_recv(s, 100)
self.assertEqual(d, b'hello')
srv.close()
await srv.wait_closed()
async def test(proto_factory, exc_type, exc_re):
port = tb.find_free_port()
proto = proto_factory()
srv = await self.loop.create_server(
lambda: proto, '127.0.0.1', port)
try:
s = socket.socket(socket.AF_INET)
with s:
s.setblocking(False)
await self.loop.sock_connect(s, ('127.0.0.1', port))
await self.loop.sock_sendall(s, b'a')
d = await self.loop.sock_recv(s, 100)
if not d:
raise ConnectionResetError
except ConnectionResetError:
pass
else:
def test_create_server_6(self):
if not hasattr(socket, 'SO_REUSEPORT'):
raise unittest.SkipTest(
'The system does not support SO_REUSEPORT')
if sys.version_info[:3] < (3, 5, 1):
raise unittest.SkipTest(
'asyncio in CPython 3.5.0 does not have the '
'reuse_port argument')
port = tb.find_free_port()
async def runner():
srv1 = await self.loop.create_server(
asyncio.Protocol,
None, port,
reuse_port=True)
srv2 = await self.loop.create_server(
asyncio.Protocol,
None, port,
reuse_port=True)
srv1.close()
srv2.close()
await srv1.wait_closed()
def test_create_server_5(self):
# Test that create_server sets the TCP_IPV6ONLY flag,
# so it can bind to ipv4 and ipv6 addresses
# simultaneously.
port = tb.find_free_port()
async def runner():
srv = await self.loop.create_server(
asyncio.Protocol,
None, port)
srv.close()
await srv.wait_closed()
self.loop.run_until_complete(runner())