Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, loop=None):
if loop is None:
self._loop = asyncio.get_event_loop()
else:
self._loop = loop
self._users: Optional[Users] = None
self._scheduler: Optional[aiojobs.Scheduler] = None
def handle_loop_error(
root_app: web.Application,
loop: asyncio.AbstractEventLoop,
context: Mapping[str, Any],
) -> None:
if isinstance(loop, aiojobs.Scheduler):
loop = current_loop()
exception = context.get('exception')
msg = context.get('message', '(empty message)')
if exception is not None:
if sys.exc_info()[0] is not None:
log.exception('Error inside event loop: {0}', msg)
if 'error_monitor' in root_app:
loop.create_task(root_app['error_monitor'].capture_exception())
else:
exc_info = (type(exception), exception, exception.__traceback__)
log.error('Error inside event loop: {0}', msg, exc_info=exc_info)
if 'error_monitor' in root_app:
loop.create_task(root_app['error_monitor'].capture_exception(exception))