Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _send_event_data(self, timeout_time=None, last_exception=None):
# type: (Optional[float], Optional[Exception]) -> None
if self._unsent_events:
self._open()
self._set_msg_timeout(timeout_time, last_exception)
self._handler.queue_message(*self._unsent_events) # type: ignore
self._handler.wait() # type: ignore
self._unsent_events = self._handler.pending_messages # type: ignore
if self._outcome != constants.MessageSendResult.Ok:
if self._outcome == constants.MessageSendResult.Timeout:
self._condition = OperationTimeoutError("Send operation timed out")
if self._condition:
raise self._condition
def __init__(self, max_size_in_bytes=None, partition_id=None, partition_key=None):
# type: (Optional[int], Optional[str], Optional[Union[str, bytes]]) -> None
self.max_size_in_bytes = max_size_in_bytes or constants.MAX_MESSAGE_LENGTH_BYTES
self.message = BatchMessage(data=[], multi_messages=False, properties=None)
self._partition_id = partition_id
self._partition_key = partition_key
set_message_partition_key(self.message, self._partition_key)
self._size = self.message.gather()[0].get_message_encoded_size()
self._count = 0
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore
from uamqp import constants
from ._common import EventData, EventDataBatch
from ._version import VERSION
__version__ = VERSION
from ._producer_client import EventHubProducerClient
from ._consumer_client import EventHubConsumerClient
from ._client_base import EventHubSharedKeyCredential
from ._eventprocessor.checkpoint_store import CheckpointStore
from ._eventprocessor.common import CloseReason
from ._eventprocessor.partition_context import PartitionContext
TransportType = constants.TransportType
__all__ = [
"EventData",
"EventDataBatch",
"EventHubProducerClient",
"EventHubConsumerClient",
"TransportType",
"EventHubSharedKeyCredential",
"CheckpointStore",
"CloseReason",
"PartitionContext",
]
# type: () -> None
retried_times = 0
last_exception = None
max_retries = (
self._client._config.max_retries # pylint:disable=protected-access
)
while retried_times <= max_retries:
try:
if self._open():
self._handler.do_work() # type: ignore
return
except Exception as exception: # pylint: disable=broad-except
if (
isinstance(exception, uamqp.errors.LinkDetach)
and exception.condition == uamqp.constants.ErrorCodes.LinkStolen # pylint: disable=no-member
):
raise self._handle_exception(exception)
if not self.running: # exit by close
return
if self._last_received_event:
self._offset = self._last_received_event.offset
last_exception = self._handle_exception(exception)
retried_times += 1
if retried_times > max_retries:
_LOGGER.info(
"%r operation has exhausted retry. Last exception: %r.",
self._name,
last_exception,
)
raise last_exception
def __init__(self, message, details=None):
self.error = None
self.message = message
self.details = details
if isinstance(message, constants.MessageSendResult):
self.message = "Message send failed with result: {}".format(message)
if details and isinstance(details, Exception):
try:
condition = details.condition.value.decode('UTF-8')
except AttributeError:
condition = details.condition.decode('UTF-8')
_, _, self.error = condition.partition(':')
self.message += "\nError: {}".format(self.error)
try:
self._parse_error(details.description)
for detail in self.details:
self.message += "\n{}".format(detail)
except: # pylint: disable=bare-except
self.message += "\n{}".format(details)
super(EventHubError, self).__init__(self.message)
-'partition_count'
-'partition_ids'
:rtype: dict
"""
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password":self._auth_config.get("iot_password")}
try:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = uamqp.AMQPClient(self.mgmt_target, auth=mgmt_auth, debug=self.debug)
mgmt_client.open()
mgmt_msg = Message(application_properties={'name': self.eh_name})
response = mgmt_client.mgmt_request(
mgmt_msg,
constants.READ_OPERATION,
op_type=b'com.microsoft:eventhub',
status_code_field=b'status-code',
description_fields=b'status-description')
eh_info = response.get_data()
output = {}
if eh_info:
output['name'] = eh_info[b'name'].decode('utf-8')
output['type'] = eh_info[b'type'].decode('utf-8')
output['created_at'] = datetime.datetime.fromtimestamp(float(eh_info[b'created_at'])/1000)
output['partition_count'] = eh_info[b'partition_count']
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
return output
finally:
mgmt_client.close()
async def _management_request_async(self, mgmt_msg: Message, op_type: bytes) -> Any:
retried_times = 0
last_exception = None
while retried_times <= self._config.max_retries:
mgmt_auth = await self._create_auth_async()
mgmt_client = AMQPClientAsync(
self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing
)
try:
conn = await self._conn_manager_async.get_connection(
self._address.hostname, mgmt_auth
)
await mgmt_client.open_async(connection=conn)
response = await mgmt_client.mgmt_request_async(
mgmt_msg,
constants.READ_OPERATION,
op_type=op_type,
status_code_field=b"status-code",
description_fields=b"status-description",
)
status_code = response.application_properties[b"status-code"]
if status_code < 400:
return response
raise errors.AuthenticationException(
"Management request error. Status code: {}".format(status_code)
)
except Exception as exception: # pylint:disable=broad-except
last_exception = await _handle_exception(exception, self)
await self._backoff_async(
retried_times=retried_times, last_exception=last_exception
)
retried_times += 1
"""
Get details on the specified EventHub async.
:rtype: dict
"""
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password":self._auth_config.get("iot_password")}
try:
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug)
await mgmt_client.open_async()
mgmt_msg = Message(application_properties={'name': self.eh_name})
response = await mgmt_client.mgmt_request_async(
mgmt_msg,
constants.READ_OPERATION,
op_type=b'com.microsoft:eventhub',
status_code_field=b'status-code',
description_fields=b'status-description')
eh_info = response.get_data()
output = {}
if eh_info:
output['name'] = eh_info[b'name'].decode('utf-8')
output['type'] = eh_info[b'type'].decode('utf-8')
output['created_at'] = datetime.datetime.fromtimestamp(float(eh_info[b'created_at'])/1000)
output['partition_count'] = eh_info[b'partition_count']
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
return output
finally:
await mgmt_client.close_async()
async def _send_event_data(
self,
timeout_time: Optional[float] = None,
last_exception: Optional[Exception] = None,
) -> None:
# TODO: Correct uAMQP type hints
if self._unsent_events:
await self._open()
self._set_msg_timeout(timeout_time, last_exception)
self._handler.queue_message(*self._unsent_events) # type: ignore
await self._handler.wait_async() # type: ignore
self._unsent_events = self._handler.pending_messages # type: ignore
if self._outcome != constants.MessageSendResult.Ok:
if self._outcome == constants.MessageSendResult.Timeout:
self._condition = OperationTimeoutError("Send operation timed out")
if self._condition:
raise self._condition
def _handle_exception(self, exception):
if isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)):
if exception.action and exception.action.retry and self.auto_reconnect:
_log.info("Handler detached. Attempting reconnect.")
self.reconnect()
elif exception.condition == constants.ErrorCodes.UnauthorizedAccess:
_log.info("Handler detached. Shutting down.")
error = ServiceBusAuthorizationError(str(exception), exception)
self.close(exception=error)
raise error
else:
_log.info("Handler detached. Shutting down.")
error = ServiceBusConnectionError(str(exception), exception)
self.close(exception=error)
raise error
elif isinstance(exception, errors.MessageHandlerError):
if self.auto_reconnect:
_log.info("Handler error. Attempting reconnect.")
self.reconnect()
else:
_log.info("Handler error. Shutting down.")
error = ServiceBusConnectionError(str(exception), exception)