Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Basic SIGTERM"""
created_tasks = []
async def background_task():
await asyncio.sleep(10)
async def main():
# loop = asyncio.get_running_loop()
loop = asyncio.get_event_loop()
created_tasks.extend(loop.create_task(background_task()) for i in range(10))
await asyncio.sleep(0.01)
raise Exception("Stops the loop")
with pytest.raises(Exception) as excinfo:
run(main(), stop_on_unhandled_errors=True)
print(excinfo.traceback)
assert "Stops the loop" in str(excinfo.value)
assert all(t.cancelled for t in created_tasks)
out = await shutdown_waits_for(corofn(sleep=0.01))
assert out is True
# This one is going to last longer than the shutdown
# but we can't get anything back out if that happens.
await shutdown_waits_for(corofn(sleep=0.03))
# main() gets cancelled here
await asyncio.sleep(2) # pragma: no cover.
# This append won't happen
items.append(True) # pragma: no cover.
except asyncio.CancelledError:
print("main got cancelled")
raise
kill(SIGTERM, after=0.02)
run(main())
assert len(items) == 2
def test_sigterm():
"""Basic SIGTERM"""
async def main():
await asyncio.sleep(5.0)
kill(SIGTERM)
loop = newloop()
run(main(), loop=loop)
assert not loop.is_closed()
def test_sigterm_enduring_ensure_future():
"""Calling `shutdown_waits_for()` via `ensure_future()`"""
items = []
async def corofn():
await asyncio.sleep(0.02)
items.append(True)
async def main():
# Note that we don't need a loop variable anywhere!
asyncio.ensure_future(shutdown_waits_for(corofn()))
kill(SIGTERM, after=0.01)
run(main())
assert items
def test_sigint():
"""Basic SIGINT"""
kill(SIGINT)
run()
def test_exe():
"""Custom executor"""
exe = ThreadPoolExecutor()
kill(SIGTERM)
run(executor=exe)
def test_sigterm_enduring_create_task():
"""Calling `shutdown_waits_for()` via `create_task()`"""
items = []
async def corofn():
await asyncio.sleep(0.04)
items.append(True)
async def main():
loop = asyncio.get_event_loop()
loop.create_task(shutdown_waits_for(corofn()))
kill(SIGTERM, after=0.02)
run(main())
assert items
def test_uvloop():
"""Basic SIGTERM"""
async def main():
await asyncio.sleep(0)
asyncio.get_event_loop().stop()
run(main(), use_uvloop=True)
the hidden Task inside). However, there will be a RuntimeWarning
that the coroutine returned from `shutdown_waits_for() was never
awaited. Therefore, maybe best not to use this, just to avoid
confusion."""
items = []
async def corofn():
await asyncio.sleep(0.02)
items.append(True)
async def main():
shutdown_waits_for(corofn()) # <-- Look Ma! No awaits!
kill(SIGTERM, after=0.01)
run(main())
assert items
broker = Server(
auth=auth,
exporter=args.exporter,
name=args.name,
)
if args.bind or not args.endpoint:
bind = args.bind or '0.0.0.0:20000'
broker.add_endpoint_legacy(bind, tlscert=args.tlscert, tlskey=args.tlskey)
if args.endpoint:
for endpoint in args.endpoint:
broker.add_endpoint_str(endpoint)
return aiorun.run(broker.serve_forever())