Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"password": self._auth_config.get("iot_password")}
connect_count = 0
while True:
connect_count += 1
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = uamqp.AMQPClient(self.mgmt_target, auth=mgmt_auth, debug=self.config.network_tracing)
try:
mgmt_client.open()
response = mgmt_client.mgmt_request(
mgmt_msg,
constants.READ_OPERATION,
op_type=op_type,
status_code_field=b'status-code',
description_fields=b'status-description')
return response
except (errors.AMQPConnectionError, errors.TokenAuthFailure, compat.TimeoutException) as failure:
if connect_count >= self.config.max_retries:
err = ConnectError(
"Can not connect to EventHubs or get management info from the service. "
"Please make sure the connection string or token is correct and retry. "
"Besides, this method doesn't work if you use an IoT connection string.",
failure
)
raise err
finally:
mgmt_client.close()
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
return False
log.info("AsyncSender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
return False
log.info("AsyncSender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("AsyncSender couldn't authenticate. Attempting reconnect.")
return False
log.info("AsyncSender connection error (%r). Shutting down.", shutdown)
error = EventHubError(str(shutdown))
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Sender reconnect failed: {}".format(e))
await self.close_async(exception=error)
raise error
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
return False
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
return False
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("AsyncReceiver couldn't authenticate. Attempting reconnect.")
return False
log.info("AsyncReceiver connection error (%r). Shutting down.", shutdown)
error = EventHubError(str(shutdown))
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receiver reconnect failed: {}".format(e))
await self.close_async(exception=error)
raise error
if shutdown.action.retry and self.auto_reconnect:
log.info("Receiver detached. Attempting reconnect.")
return False
log.info("Receiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("Receiver detached. Attempting reconnect.")
return False
log.info("Receiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("Receiver couldn't authenticate. Attempting reconnect.")
return False
log.info("Receiver connection error (%r). Shutting down.", shutdown)
error = EventHubError(str(shutdown))
self.close(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receiver reconnect failed: {}".format(e))
self.close(exception=error)
raise error
if shutdown.action.retry and self.auto_reconnect:
log.info("Sender detached. Attempting reconnect.")
return False
log.info("Sender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("Sender detached. Attempting reconnect.")
return False
log.info("Sender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
self.close(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("Sender couldn't authenticate. Attempting reconnect.")
return False
log.info("Sender connection error (%r). Shutting down.", shutdown)
error = EventHubError(str(shutdown))
self.close(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Sender Reconnect failed: {}".format(e))
self.close(exception=error)
raise error
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
return False
log.info("AsyncSender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
return False
log.info("AsyncSender reconnect failed. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("AsyncSender couldn't authenticate. Attempting reconnect.")
return False
log.info("AsyncSender connection error (%r). Shutting down.", shutdown)
error = EventHubError(str(shutdown))
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Sender reconnect failed: {}".format(e))
await self.close_async(exception=error)
raise error
raise error
elif isinstance(exception, errors.MessageException):
_LOGGER.info("%r Event data send error (%r)", name, exception)
error = EventDataSendError(str(exception), exception)
raise error
else:
try:
if isinstance(exception, errors.AuthenticationException):
await closable._close_connection_async()
elif isinstance(exception, errors.LinkDetach):
await cast("ConsumerProducerMixin", closable)._close_handler_async()
elif isinstance(exception, errors.ConnectionClose):
await closable._close_connection_async()
elif isinstance(exception, errors.MessageHandlerError):
await cast("ConsumerProducerMixin", closable)._close_handler_async()
elif isinstance(exception, errors.AMQPConnectionError):
await closable._close_connection_async()
elif isinstance(exception, compat.TimeoutException):
pass # Timeout doesn't need to recreate link or connection to retry
else:
await closable._close_connection_async()
except AttributeError:
pass
return _create_eventhub_exception(exception)
device_filter_txt if device_filter_txt else ""
)
)
try:
client = uamqp.ReceiveClient(
"amqps://" + endpoint, client_name=_get_container_id(), debug=DEBUG
)
message_generator = client.receive_messages_iter()
for msg in message_generator:
match = handle_msg(msg)
if match:
logger.info("Requested message Id has been matched...")
msg.accept()
return match
except uamqp.errors.AMQPConnectionError:
logger.debug("AMQPS connection has expired...")
finally:
client.close()
raise error
else:
_log.info("Async handler detached. Shutting down.")
error = ServiceBusConnectionError(str(exception), exception)
await self.close(exception=error)
raise error
elif isinstance(exception, errors.MessageHandlerError):
if self.auto_reconnect:
_log.info("Async handler error. Attempting reconnect.")
await self.reconnect()
else:
_log.info("Async handler error. Shutting down.")
error = ServiceBusConnectionError(str(exception), exception)
await self.close(exception=error)
raise error
elif isinstance(exception, errors.AMQPConnectionError):
message = "Failed to open handler: {}".format(exception)
raise ServiceBusConnectionError(message, exception)
else:
_log.info("Unexpected error occurred (%r). Shutting down.", exception)
error = ServiceBusError("Handler failed: {}".format(exception), exception)
await self.close(exception=error)
raise error
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
return False
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
return False
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("AsyncReceiver couldn't authenticate. Attempting reconnect.")
return False
log.info("AsyncReceiver connection error (%r). Shutting down.", shutdown)
error = EventHubError(str(shutdown))
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receiver reconnect failed: {}".format(e))
await self.close_async(exception=error)
raise error