Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@aiomisc.threaded
def http_client():
conn = http.client.HTTPConnection(
host='127.0.0.1',
port=aiomisc_unused_port,
timeout=1
)
conn.request('GET', '/')
response = conn.getresponse()
return response.code
@pytest.fixture(params=(aiomisc.threaded, aiomisc.threaded_separate))
def threaded_decorator(request, executor: aiomisc.ThreadPoolExecutor):
assert executor
return request.param
@aiomisc.threaded
def writer():
with ExitStack() as stack:
sock = stack.enter_context(
socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
)
ssock = stack.enter_context(ssl_client_context.wrap_socket(
sock, server_hostname='localhost'
))
ssock.connect(('127.0.0.1', aiomisc_unused_port))
ssock.send(b'hello server\n')
async def test_cancel(executor, loop, timer):
sleep = aiomisc.threaded(time.sleep)
async with timeout(2):
with timer(1, dispersion=2):
tasks = [loop.create_task(sleep(1)) for _ in range(1000)]
await asyncio.sleep(1)
for task in tasks:
task.cancel()
executor.shutdown(wait=True)
@aiomisc.threaded
def arange(*args):
return (yield from range(*args))