Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def main():
# Note that we don't need a loop variable anywhere!
asyncio.ensure_future(shutdown_waits_for(corofn()))
async def main():
try:
# This one is fast enough to finish
out = await shutdown_waits_for(corofn(sleep=0.01))
assert out is True
# This one is going to last longer than the shutdown
# but we can't get anything back out if that happens.
await shutdown_waits_for(corofn(sleep=0.03))
# main() gets cancelled here
await asyncio.sleep(2) # pragma: no cover.
# This append won't happen
items.append(True) # pragma: no cover.
except asyncio.CancelledError:
print("main got cancelled")
raise
async def main():
shutdown_waits_for(corofn()) # <-- Look Ma! No awaits!
async def main():
loop = asyncio.get_event_loop()
loop.create_task(shutdown_waits_for(corofn()))
async def main():
loop = asyncio.get_event_loop()
coro = corofn(sleep=0.2)
loop.call_later(0.1, direct_cancel)
with pytest.raises(asyncio.CancelledError):
await shutdown_waits_for(coro)