Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, gf, *args, group=0, **kwargs):
task = gf(TaskId(Cancellable.task_no), *args, **kwargs)
if task in self.tasks:
raise ValueError('Task already exists.')
self.tasks[Cancellable.task_no] = [task, group, None]
self.task_no = Cancellable.task_no # For subclass
Cancellable.task_no += 1
self.task = task
def new_gen(*args, **kwargs):
if isinstance(args[0], TaskId): # Not a bound method
task_id = args[0]
g = f(*args[1:], **kwargs)
else: # Task ID is args[1] if a bound method
task_id = args[1]
args = (args[0],) + args[2:]
g = f(*args, **kwargs)
try:
res = await g
return res
finally:
NamedTask._stopped(task_id)
return new_gen