Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wait(self):
"""
Wait until all transferred events have been sent.
"""
if self.error:
raise self.error
if not self.running:
raise ValueError("Unable to send until client has been started.")
try:
self._handler.wait()
except (errors.TokenExpired, errors.AuthenticationException):
log.info("Sender disconnected due to token error. Attempting reconnect.")
self.reconnect()
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("Sender detached. Attempting reconnect.")
self.reconnect()
else:
log.info("Sender detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("Sender detached. Attempting reconnect.")
self.reconnect()
else:
errors.MessageAccepted,
errors.MessageAlreadySettled,
errors.MessageModified,
errors.MessageRejected,
errors.MessageReleased,
errors.MessageContentTooLarge)
):
log.info("%r Event data error (%r)", name, exception)
error = EventDataError(str(exception), exception)
raise error
elif isinstance(exception, errors.MessageException):
log.info("%r Event data send error (%r)", name, exception)
error = EventDataSendError(str(exception), exception)
raise error
else:
if isinstance(exception, errors.AuthenticationException):
if hasattr(closable, "_close_connection"):
await closable._close_connection() # pylint:disable=protected-access
elif isinstance(exception, errors.LinkDetach):
if hasattr(closable, "_close_handler"):
await closable._close_handler() # pylint:disable=protected-access
elif isinstance(exception, errors.ConnectionClose):
if hasattr(closable, "_close_connection"):
await closable._close_connection() # pylint:disable=protected-access
elif isinstance(exception, errors.MessageHandlerError):
if hasattr(closable, "_close_handler"):
await closable._close_handler() # pylint:disable=protected-access
elif isinstance(exception, errors.AMQPConnectionError):
if hasattr(closable, "_close_connection"):
await closable._close_connection() # pylint:disable=protected-access
elif isinstance(exception, compat.TimeoutException):
pass # Timeout doesn't need to recreate link or connection to retry
def _handle_exception(self, exception):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, self)
async def _handle_exception(self, exception):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return await _handle_exception(exception, self)
return await _handle_exception(exception, self)
def _handle_exception(self, exception):
if not self._running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, self)
async def _handle_exception(self, exception: Exception) -> Exception:
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return await _handle_exception(exception, self)
return await _handle_exception(exception, self)
async def wait_async(self):
"""
Wait until all transferred events have been sent.
"""
if self.error:
raise self.error
if not self.running:
raise ValueError("Unable to send until client has been started.")
try:
await self._handler.wait_async()
except (errors.TokenExpired, errors.AuthenticationException):
log.info("AsyncSender disconnected due to token error. Attempting reconnect.")
await self.reconnect_async()
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncSender detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
await self.reconnect_async()
else: