Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
next_arg = 0
while True:
while next_arg < len(args) and (limit == -1 or len(pending) < limit):
# We have to defer the creation of the Task as long as possible
# because once we do, it starts executing, regardless of what we
# have in the pending set.
if args[next_arg] in input_map:
input_map[args[next_arg]].append(next_arg)
else:
# We call ensure_future directly to ensure that we have a Task
# because the return value of asyncio.wait will be an implicit
# task otherwise, and we won't be able to know which input it
# corresponds to.
task: asyncio.Future[T] = asyncio.ensure_future(args[next_arg])
pending.add(task)
pos[task] = next_arg
input_map[args[next_arg]] = [next_arg]
next_arg += 1
# pending might be empty if the last items of args were dupes;
# asyncio.wait([]) will raise an exception.
if pending:
done, pending = await asyncio.wait(
pending, loop=loop, return_when=asyncio.FIRST_COMPLETED
)
for x in done:
if return_exceptions and x.exception():
ret[pos[x]] = x.exception()
else:
ret[pos[x]] = x.result()