Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@unittest.skipIf(
multiprocessing.get_start_method(False) == 'spawn',
'no need to test on macOS where spawn is used instead of fork')
def test_executors_process_pool_01(self):
self.run_pool_test(concurrent.futures.ProcessPoolExecutor)
def test_executors_process_pool_02(self):
self.run_pool_test(concurrent.futures.ThreadPoolExecutor)
class TestUVExecutors(_TestExecutors, tb.UVTestCase):
pass
class TestAIOExecutors(_TestExecutors, tb.AIOTestCase):
pass
loop.run_until_complete(server.wait_closed())
try:
loop.close()
except Exception as exc:
print(exc)
qout.put('stopped')
thread = threading.Thread(target=server_thread, daemon=True)
thread.start()
quin.get()
server_loop.call_soon_threadsafe(server_loop.stop)
thread.join(1)
class TestIssue39Regr(tb.UVTestCase):
"""See https://github.com/MagicStack/uvloop/issues/39 for details.
Original code to reproduce the bug is by Jim Fulton.
"""
def on_alarm(self, sig, fr):
if self.running:
raise FailedTestError
def run_test(self):
try:
for i in range(10):
for threaded in [True, False]:
if threaded:
qin, qout = queue.Queue(), queue.Queue()
threading.Thread(
self._test_getnameinfo(('127.0.0.1', 80), 0)
def test_getnameinfo_2(self):
self._test_getnameinfo(('127.0.0.1', 80, 1231231231213), 0)
def test_getnameinfo_3(self):
self._test_getnameinfo(('127.0.0.1', 80, 0, 0), 0)
def test_getnameinfo_4(self):
self._test_getnameinfo(('::1', 80), 0)
def test_getnameinfo_5(self):
self._test_getnameinfo(('localhost', 8080), 0)
class Test_UV_DNS(BaseTestDNS, tb.UVTestCase):
def test_getaddrinfo_close_loop(self):
# Test that we can close the loop with a running
# DNS query.
try:
# Check that we have internet connection
socket.getaddrinfo('example.com', 80)
except socket.error:
raise unittest.SkipTest
async def run():
fut = self.loop.create_task(
self.loop.getaddrinfo('example.com', 80))
await asyncio.sleep(0)
fut.cancel()
data = bytearray()
def reader(data):
try:
chunk = os.read(rpipe, 1024)
except BlockingIOError:
return len(data)
data += chunk
return len(data)
tb.run_until(self.loop, lambda: reader(data) >= 1)
self.assertEqual(b'1', data)
transport.write(b'2345')
tb.run_until(self.loop, lambda: reader(data) >= 5)
self.assertEqual(b'12345', data)
self.assertEqual('CONNECTED', proto.state)
os.close(rpipe)
# extra info is available
self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
# close connection
proto.transport.close()
self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)
@unittest.skipUnless(tb.has_IPv6, 'no IPv6')
def test_socket_ipv6_addr(self):
server_sock = socket.socket(socket.AF_INET6)
with server_sock:
server_sock.bind(('::1', 0))
addr = server_sock.getsockname() # tuple of 4 elements for IPv6
async def run():
sock = socket.socket(socket.AF_INET6)
with sock:
sock.setblocking(False)
# Check that sock_connect accepts 4-element address tuple
# for IPv6 sockets.
f = self.loop.sock_connect(sock, addr)
try:
await asyncio.wait_for(f, timeout=0.1)