Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._handler.wait()
return True
except errors.TokenExpired as shutdown:
log.info("Sender disconnected due to token expiry. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("Sender detached. Attempting reconnect.")
return False
log.info("Sender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("Sender detached. Attempting reconnect.")
return False
log.info("Sender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("Sender couldn't authenticate. Attempting reconnect.")
return False
log.info("Sender connection error (%r). Shutting down.", shutdown)
error = EventHubError(str(shutdown))
self.close(exception=error)
raise error
except Exception as e:
msg_timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
try:
self._handler.open()
self._handler.queue_message(*unsent_events)
self._handler.wait()
return True
except errors.TokenExpired as shutdown:
log.info("Sender disconnected due to token expiry. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("Sender detached. Attempting reconnect.")
return False
log.info("Sender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("Sender detached. Attempting reconnect.")
return False
log.info("Sender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
def _create_eventhub_exception(exception):
if isinstance(exception, errors.AuthenticationException):
error = AuthenticationError(str(exception), exception)
elif isinstance(exception, errors.VendorLinkDetach):
error = ConnectError(str(exception), exception)
elif isinstance(exception, errors.LinkDetach):
error = ConnectionLostError(str(exception), exception)
elif isinstance(exception, errors.ConnectionClose):
error = ConnectionLostError(str(exception), exception)
elif isinstance(exception, errors.MessageHandlerError):
error = ConnectionLostError(str(exception), exception)
elif isinstance(exception, errors.AMQPConnectionError):
error_type = (
AuthenticationError
if str(exception).startswith("Unable to open authentication session")
else ConnectError
)
error = error_type(str(exception), exception)
elif isinstance(exception, compat.TimeoutException):
error = ConnectionLostError(str(exception), exception)
else:
error = EventHubError(str(exception), exception)
return error
if self.error:
raise self.error
if not self.running:
raise ValueError("Unable to receive until client has been started.")
data_batch = []
try:
timeout_ms = 1000 * timeout if timeout else 0
message_batch = self._handler.receive_message_batch(
max_batch_size=max_batch_size,
timeout=timeout_ms)
for message in message_batch:
event_data = EventData(message=message)
self.offset = event_data.offset
data_batch.append(event_data)
return data_batch
except (errors.TokenExpired, errors.AuthenticationException):
log.info("Receiver disconnected due to token error. Attempting reconnect.")
self.reconnect()
return data_batch
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("Receiver detached. Attempting reconnect.")
self.reconnect()
return data_batch
log.info("Receiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("Receiver detached. Attempting reconnect.")
self.reconnect()
self._handler = SendClientAsync(
self.target,
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
try:
await self._handler.open_async()
self._handler.queue_message(*unsent_events)
await self._handler.wait_async()
return True
except errors.TokenExpired as shutdown:
log.info("AsyncSender disconnected due to token expiry. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
return False
log.info("AsyncSender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
return False
def _create_eventhub_exception(exception):
if isinstance(exception, errors.AuthenticationException):
error = AuthenticationError(str(exception), exception)
elif isinstance(exception, errors.VendorLinkDetach):
error = ConnectError(str(exception), exception)
elif isinstance(exception, errors.LinkDetach):
error = ConnectionLostError(str(exception), exception)
elif isinstance(exception, errors.ConnectionClose):
error = ConnectionLostError(str(exception), exception)
elif isinstance(exception, errors.MessageHandlerError):
error = ConnectionLostError(str(exception), exception)
elif isinstance(exception, errors.AMQPConnectionError):
error_type = (
AuthenticationError
if str(exception).startswith("Unable to open authentication session")
else ConnectError
)
error = error_type(str(exception), exception)
elif isinstance(exception, compat.TimeoutException):
error = ConnectionLostError(str(exception), exception)
try:
conn = self._conn_manager.get_connection(
self._address.hostname, mgmt_auth
) # pylint:disable=assignment-from-none
mgmt_client.open(connection=conn)
response = mgmt_client.mgmt_request(
mgmt_msg,
constants.READ_OPERATION,
op_type=op_type,
status_code_field=b"status-code",
description_fields=b"status-description",
)
status_code = response.application_properties[b"status-code"]
if status_code < 400:
return response
raise errors.AuthenticationException(
"Management request error. Status code: {}".format(status_code)
)
except Exception as exception: # pylint: disable=broad-except
last_exception = _handle_exception(exception, self)
self._backoff(
retried_times=retried_times, last_exception=last_exception
)
retried_times += 1
if retried_times > self._config.max_retries:
_LOGGER.info(
"%r returns an exception %r", self._container_id, last_exception
)
raise last_exception
finally:
mgmt_client.close()