Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
done: Set[Awaitable[T]] = set()
pending: Set[Awaitable[T]] = set(aws)
remaining: Optional[float] = None
if timeout and timeout > 0:
threshold = time.time() + timeout
else:
timeout = None
while pending:
if timeout:
remaining = threshold - time.time()
if remaining <= 0:
raise asyncio.TimeoutError()
# asyncio.Future inherits from typing.Awaitable
# asyncio.wait takes Iterable[Union[Future, Generator, Awaitable]], but
# returns Tuple[Set[Future], Set[Future]. Because mypy doesn't like assigning
# these values to existing Set[Awaitable] or even Set[Union[Awaitable, Future]],
# we need to first cast the results to something that we can actually use
# asyncio.Future: https://github.com/python/typeshed/blob/72ff7b94e534c610ddf8939bacbc55343e9465d2/stdlib/3/asyncio/futures.pyi#L30
# asyncio.wait(): https://github.com/python/typeshed/blob/72ff7b94e534c610ddf8939bacbc55343e9465d2/stdlib/3/asyncio/tasks.pyi#L89
done, pending = cast(
Tuple[Set[Awaitable[T]], Set[Awaitable[T]]],
await asyncio.wait(
pending,
loop=loop,
timeout=remaining,
return_when=asyncio.FIRST_COMPLETED,
),