Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
asynclib_name = sniffio.current_async_library()
except sniffio.AsyncLibraryNotFoundError:
pass
else:
raise RuntimeError(f'Already running {asynclib_name} in this thread')
try:
asynclib = import_module(f'{__name__}._backends._{backend}')
except ImportError as exc:
raise LookupError(f'No such backend: {backend}') from exc
token = None
if sniffio.current_async_library_cvar.get(None) is None:
# Since we're in control of the event loop, we can cache the name of the async library
token = sniffio.current_async_library_cvar.set(backend)
try:
backend_options = backend_options or {}
return asynclib.run(func, *args, **backend_options) # type: ignore
finally:
if token:
sniffio.current_async_library_cvar.reset(token)