Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test():
context = aiomisc.get_context()
context['foo'] = True
await asyncio.sleep(0.1)
results.append(await context['foo'])
context['foo'] = False
await asyncio.sleep(0.1)
results.append(await context['foo'])
context['foo'] = None
await asyncio.sleep(0.1)
results.append(await context['foo'])
with aiomisc.entrypoint() as loop:
loop.run_until_complete(test())
assert results == [True, False, None]
def test_service_start_event():
class Sleeper(aiomisc.Service):
result = False
async def start(self):
self.start_event.set()
await asyncio.sleep(86400)
Sleeper.result = True
with aiomisc.entrypoint(Sleeper()):
pass
assert not Sleeper.result
async def coro():
nonlocal results
await asyncio.sleep(0.5)
results.append(True)
async def main(loop):
task = loop.create_task(coro())
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
finally:
await asyncio.sleep(1)
with aiomisc.entrypoint() as loop:
loop.run_until_complete(main(loop))
assert results == [True]
DATA = []
async def handle_client(self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
self.DATA.append(await reader.readline())
writer.close()
service = TestService('127.0.0.1', aiomisc_unused_port)
@aiomisc.threaded
def writer():
port = aiomisc_unused_port
with socket.create_connection(('127.0.0.1', port)) as sock:
sock.send(b'hello server\n')
with aiomisc.entrypoint(service) as loop:
loop.run_until_complete(writer())
assert TestService.DATA
assert TestService.DATA == [b'hello server\n']
def test_service_no_start_event():
class Sleeper(aiomisc.Service):
result = False
async def start(self):
await asyncio.sleep(1)
Sleeper.result = True
with aiomisc.entrypoint(Sleeper()):
pass
assert Sleeper.result
svc = CountPeriodicService(interval=0.1)
async def assert_counter():
nonlocal counter, svc
counter = 0
await asyncio.sleep(0.5)
assert 4 <= counter <= 7
await svc.stop(None)
await asyncio.sleep(0.5)
assert 4 <= counter <= 7
with aiomisc.entrypoint(svc) as loop:
loop.run_until_complete(assert_counter())
@aiomisc.threaded
def writer():
with ExitStack() as stack:
sock = stack.enter_context(
socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
)
ssock = stack.enter_context(ssl_client_context.wrap_socket(
sock, server_hostname='localhost'
))
ssock.connect(('127.0.0.1', aiomisc_unused_port))
ssock.send(b'hello server\n')
with aiomisc.entrypoint(service) as loop:
loop.run_until_complete(writer())
assert TestService.DATA
assert TestService.DATA == [b'hello server\n']
svc = CountPeriodicService(interval=0.1, delay=0.5)
async def assert_counter():
nonlocal counter, svc
counter = 0
await asyncio.sleep(0.25)
assert not counter
await asyncio.sleep(0.5)
await svc.stop(None)
assert 1 < counter < 4
with aiomisc.entrypoint(svc) as loop:
loop.run_until_complete(assert_counter())
def test_post_stop_signal(loop):
ep = entrypoint(FooService(), loop=loop)
called = False
async def post_stop_callback(entrypoint):
nonlocal called
await asyncio.sleep(0.01)
called = True
ep.post_stop.connect(post_stop_callback)
with ep:
assert not called
assert called
def test_aiohttp_service(aiomisc_unused_port):
@aiomisc.threaded
def http_client():
conn = http.client.HTTPConnection(
host='127.0.0.1',
port=aiomisc_unused_port,
timeout=1
)
conn.request('GET', '/')
response = conn.getresponse()
return response.code
service = AIOHTTPTestApp(address='127.0.0.1', port=aiomisc_unused_port)
with aiomisc.entrypoint(service) as loop:
response = loop.run_until_complete(http_client())
assert response == 404