Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def listener(*args):
# exit immediately if the predicate is none
if predicate is None:
await p.set(outcome.Value(None))
raise ListenerExit
try:
res = predicate(*args)
if inspect.isawaitable(res):
res = await res
except ListenerExit:
# ???
await p.set(outcome.Value(args))
raise
except Exception as e:
# something bad happened, set exception and exit
logger.exception("Exception in wait_for predicate!")
# signal that an error happened
await p.set(outcome.Error(e))
raise ListenerExit
if inspect.isawaitable(res):
res = await res
except ListenerExit:
# ???
await p.set(outcome.Value(args))
raise
except Exception as e:
# something bad happened, set exception and exit
logger.exception("Exception in wait_for predicate!")
# signal that an error happened
await p.set(outcome.Error(e))
raise ListenerExit
else:
# exit now if result is true
if res is True:
await p.set(outcome.Value(args))
raise ListenerExit
async def listener(*args):
# exit immediately if the predicate is none
if predicate is None:
await p.set(outcome.Value(None))
raise ListenerExit
try:
res = predicate(*args)
if inspect.isawaitable(res):
res = await res
except ListenerExit:
# ???
await p.set(outcome.Value(args))
raise
except Exception as e:
# something bad happened, set exception and exit
logger.exception("Exception in wait_for predicate!")
# signal that an error happened
await p.set(outcome.Error(e))
raise ListenerExit
else:
# exit now if result is true
if res is True:
await p.set(outcome.Value(args))
raise ListenerExit