Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wait(self):
"""
Wait until all transferred events have been sent.
"""
if self.error:
raise self.error
if not self.running:
raise ValueError("Unable to send until client has been started.")
try:
self._handler.wait()
except (errors.TokenExpired, errors.AuthenticationException):
log.info("Sender disconnected due to token error. Attempting reconnect.")
self.reconnect()
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("Sender detached. Attempting reconnect.")
self.reconnect()
else:
log.info("Sender detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("Sender detached. Attempting reconnect.")
self.reconnect()
else:
log.info("Sender detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
try:
self._handler.open()
while not self._handler.client_ready():
time.sleep(0.05)
return True
except errors.TokenExpired as shutdown:
log.info("Receiver disconnected due to token expiry. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("Receiver detached. Attempting reconnect.")
return False
log.info("Receiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("Receiver detached. Attempting reconnect.")
return False
log.info("Receiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
"""
if self.error:
raise self.error
if not self.running:
raise ValueError("Unable to send until client has been started.")
if event_data.partition_key and self.partition:
raise ValueError("EventData partition key cannot be used with a partition sender.")
event_data.message.on_send_complete = self._on_outcome
try:
await self._handler.send_message_async(event_data.message)
if self._outcome != constants.MessageSendResult.Ok:
raise Sender._error(self._outcome, self._condition)
except (errors.TokenExpired, errors.AuthenticationException):
log.info("AsyncSender disconnected due to token error. Attempting reconnect.")
await self.reconnect_async()
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncSender detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncSender detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
errors.MessageReleased,
errors.MessageContentTooLarge,
),
):
_LOGGER.info("%r Event data error (%r)", name, exception)
error = EventDataError(str(exception), exception)
raise error
elif isinstance(exception, errors.MessageException):
_LOGGER.info("%r Event data send error (%r)", name, exception)
error = EventDataSendError(str(exception), exception)
raise error
else:
if isinstance(exception, errors.AuthenticationException):
if hasattr(closable, "_close_connection"):
closable._close_connection() # pylint:disable=protected-access
elif isinstance(exception, errors.LinkDetach):
if hasattr(closable, "_close_handler"):
closable._close_handler() # pylint:disable=protected-access
elif isinstance(exception, errors.ConnectionClose):
if hasattr(closable, "_close_connection"):
closable._close_connection() # pylint:disable=protected-access
elif isinstance(exception, errors.MessageHandlerError):
if hasattr(closable, "_close_handler"):
closable._close_handler() # pylint:disable=protected-access
elif isinstance(exception, errors.AMQPConnectionError):
if hasattr(closable, "_close_connection"):
closable._close_connection() # pylint:disable=protected-access
elif isinstance(exception, compat.TimeoutException):
pass # Timeout doesn't need to recreate link or connection to retry
else:
if hasattr(closable, "_close_connection"):
closable._close_connection() # pylint:disable=protected-access
async def _handle_exception(self, exception):
if isinstance(exception, errors.LinkDetach) and exception.condition == SESSION_LOCK_LOST:
error = SessionLockExpired("Connection detached - lock on Session {} lost.".format(self.session_id))
await self.close(exception=error)
raise error
elif isinstance(exception, errors.LinkDetach) and exception.condition == SESSION_LOCK_TIMEOUT:
error = NoActiveSession("Queue has no active session to receive from.")
await self.close(exception=error)
raise error
return await super(SessionReceiver, self)._handle_exception(exception)
def _create_eventhub_exception(exception):
if isinstance(exception, errors.AuthenticationException):
error = AuthenticationError(str(exception), exception)
elif isinstance(exception, errors.VendorLinkDetach):
error = ConnectError(str(exception), exception)
elif isinstance(exception, errors.LinkDetach):
error = ConnectionLostError(str(exception), exception)
elif isinstance(exception, errors.ConnectionClose):
error = ConnectionLostError(str(exception), exception)
elif isinstance(exception, errors.MessageHandlerError):
error = ConnectionLostError(str(exception), exception)
elif isinstance(exception, errors.AMQPConnectionError):
error_type = (
AuthenticationError
if str(exception).startswith("Unable to open authentication session")
else ConnectError
)
error = error_type(str(exception), exception)
elif isinstance(exception, compat.TimeoutException):
error = ConnectionLostError(str(exception), exception)
else:
error = EventHubError(str(exception), exception)
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
try:
await self._handler.open_async()
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
return True
except errors.TokenExpired as shutdown:
log.info("AsyncReceiver disconnected due to token expiry. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
return False
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
return False
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
errors.MessageReleased,
errors.MessageContentTooLarge,
),
):
_LOGGER.info("%r Event data error (%r)", name, exception)
error = EventDataError(str(exception), exception)
raise error
elif isinstance(exception, errors.MessageException):
_LOGGER.info("%r Event data send error (%r)", name, exception)
error = EventDataSendError(str(exception), exception)
raise error
else:
try:
if isinstance(exception, errors.AuthenticationException):
await closable._close_connection_async()
elif isinstance(exception, errors.LinkDetach):
await cast("ConsumerProducerMixin", closable)._close_handler_async()
elif isinstance(exception, errors.ConnectionClose):
await closable._close_connection_async()
elif isinstance(exception, errors.MessageHandlerError):
await cast("ConsumerProducerMixin", closable)._close_handler_async()
elif isinstance(exception, errors.AMQPConnectionError):
await closable._close_connection_async()
elif isinstance(exception, compat.TimeoutException):
pass # Timeout doesn't need to recreate link or connection to retry
else:
await closable._close_connection_async()
except AttributeError:
pass
return _create_eventhub_exception(exception)
def _handle_exception(self, exception):
if isinstance(exception, errors.LinkDetach) and exception.condition == SESSION_LOCK_LOST:
error = SessionLockExpired("Connection detached - lock on Session {} lost.".format(self.session_id))
self.close(exception=error)
raise error
elif isinstance(exception, errors.LinkDetach) and exception.condition == SESSION_LOCK_TIMEOUT:
error = NoActiveSession("Queue has no active session to receive from.")
self.close(exception=error)
raise error
return super(SessionReceiver, self)._handle_exception(exception)
retried_times = 0
last_exception = None
max_retries = (
self._client._config.max_retries # pylint:disable=protected-access
)
while retried_times <= max_retries:
try:
await self._open()
await cast(ReceiveClientAsync, self._handler).do_work_async()
break
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception as exception: # pylint: disable=broad-except
if (
isinstance(exception, uamqp.errors.LinkDetach)
and exception.condition == uamqp.constants.ErrorCodes.LinkStolen # pylint: disable=no-member
):
raise await self._handle_exception(exception)
if not self.running: # exit by close
return
if self._last_received_event:
self._offset = self._last_received_event.offset
last_exception = await self._handle_exception(exception)
retried_times += 1
if retried_times > max_retries:
_LOGGER.info(
"%r operation has exhausted retry. Last exception: %r.",
self._name,
last_exception,
)
raise last_exception