Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
callback: Callable[..., None], *args: Any) -> None:
try:
self._loop.call_soon_threadsafe(callback, *args)
except RuntimeError:
# swallowing agreed in #2
pass
self._call_soon_threadsafe = checked_call_soon_threadsafe
def checked_call_soon(
callback: Callable[..., None], *args: Any) -> None:
if not self._loop.is_closed():
self._loop.call_soon(callback, *args)
self._call_soon = checked_call_soon
self._sync_queue = _SyncQueueProxy(self)
self._async_queue = _AsyncQueueProxy(self)