Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.prefetch = prefetch
self.epoch = epoch
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.properties = None
self.redirected = None
self.error = None
partition = self.source.split('/')[-1]
self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition)
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
if epoch:
self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))}
self._handler = ReceiveClient(
source,
auth=self.client.get_auth(),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties())
self._reconnect_backoff = 1
self._link_properties = {} # type: Dict[types.AMQPType, types.AMQPType]
self._error = None
self._timeout = 0
self._idle_timeout = (idle_timeout * 1000) if idle_timeout else None
partition = self._source.split("/")[-1]
self._partition = partition
self._name = "EHConsumer-{}-partition{}".format(uuid.uuid4(), partition)
if owner_level:
self._link_properties[types.AMQPSymbol(EPOCH_SYMBOL)] = types.AMQPLong(
int(owner_level)
)
link_property_timeout_ms = (
self._client._config.receive_timeout or self._timeout # pylint:disable=protected-access
) * 1000
self._link_properties[types.AMQPSymbol(TIMEOUT_SYMBOL)] = types.AMQPLong(
int(link_property_timeout_ms)
)
self._handler = None # type: Optional[ReceiveClient]
self._track_last_enqueued_event_properties = (
track_last_enqueued_event_properties
)
self._last_received_event = None # type: Optional[EventData]
self._owner_level = owner_level
self._keep_alive = keep_alive
self._auto_reconnect = auto_reconnect
self._retry_policy = errors.ErrorPolicy(
max_retries=self._client._config.max_retries, on_error=_error_handler # pylint:disable=protected-access
)
self._reconnect_backoff = 1
self._link_properties = {} # type: Dict[types.AMQPType, types.AMQPType]
self._error = None
self._timeout = 0
self._idle_timeout = (idle_timeout * 1000) if idle_timeout else None
partition = self._source.split("/")[-1]
self._partition = partition
self._name = "EHConsumer-{}-partition{}".format(uuid.uuid4(), partition)
if owner_level:
self._link_properties[types.AMQPSymbol(EPOCH_SYMBOL)] = types.AMQPLong(
int(owner_level)
)
link_property_timeout_ms = (
self._client._config.receive_timeout or self._timeout # pylint:disable=protected-access
) * 1000
self._link_properties[types.AMQPSymbol(TIMEOUT_SYMBOL)] = types.AMQPLong(
int(link_property_timeout_ms)
)
self._handler = None # type: Optional[ReceiveClient]
self._track_last_enqueued_event_properties = (
track_last_enqueued_event_properties
)
self._last_received_event = None # type: Optional[EventData]
)
self._reconnect_backoff = 1
self._timeout = 0
self._idle_timeout = (idle_timeout * 1000) if idle_timeout else None
self._link_properties = {} # type: Dict[types.AMQPType, types.AMQPType]
partition = self._source.split("/")[-1]
self._partition = partition
self._name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition)
if owner_level:
self._link_properties[types.AMQPSymbol(EPOCH_SYMBOL)] = types.AMQPLong(
int(owner_level)
)
link_property_timeout_ms = (
self._client._config.receive_timeout or self._timeout # pylint:disable=protected-access
) * 1000
self._link_properties[types.AMQPSymbol(TIMEOUT_SYMBOL)] = types.AMQPLong(
int(link_property_timeout_ms)
)
self._handler = None # type: Optional[ReceiveClientAsync]
self._track_last_enqueued_event_properties = (
track_last_enqueued_event_properties
)
self._event_queue = queue.Queue()
self._last_received_event = None # type: Optional[EventData]
:start-after: [START receiver_defer_session_messages]
:end-before: [END receiver_defer_session_messages]
:language: python
:dedent: 8
:caption: Defer messages, then retrieve them by sequence number.
"""
if not sequence_numbers:
raise ValueError("At least one sequence number must be specified.")
await self._can_run()
try:
receive_mode = mode.value.value
except AttributeError:
receive_mode = int(mode)
message = {
'sequence-numbers': types.AMQPArray([types.AMQPLong(s) for s in sequence_numbers]),
'receiver-settle-mode': types.AMQPuInt(receive_mode),
'session-id': self.session_id
}
handler = functools.partial(mgmt_handlers.deferred_message_op, mode=receive_mode, message_type=DeferredMessage)
messages = await self._mgmt_request_response(
REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
message,
handler)
for m in messages:
m._receiver = self # pylint: disable=protected-access
return messages
:param sequence_numbers: The seqeuence numbers of the scheduled messages.
:type sequence_numbers: int
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START cancel_schedule_messages]
:end-before: [END cancel_schedule_messages]
:language: python
:dedent: 4
:caption: Schedule messages.
"""
if not self.running:
await self.open()
numbers = [types.AMQPLong(s) for s in sequence_numbers]
request_body = {'sequence-numbers': types.AMQPArray(numbers)}
return await self._mgmt_request_response(
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
request_body,
mgmt_handlers.default)
:end-before: [END receive_deferred_messages_service_bus]
:language: python
:dedent: 8
:caption: Get the messages which were deferred using their sequence numbers
"""
if (self.entity and self.requires_session) or kwargs.get('session'):
raise ValueError("Sessionful deferred messages can only be received within a locked receive session.")
if not sequence_numbers:
raise ValueError("At least one sequence number must be specified.")
try:
receive_mode = mode.value.value
except AttributeError:
receive_mode = int(mode)
message = {
'sequence-numbers': types.AMQPArray([types.AMQPLong(s) for s in sequence_numbers]),
'receiver-settle-mode': types.AMQPuInt(receive_mode)}
mgmt_handler = functools.partial(mgmt_handlers.deferred_message_op, mode=receive_mode)
with BaseHandler(self.entity_uri, self.auth_config, debug=self.debug, **kwargs) as handler:
return handler._mgmt_request_response( # pylint: disable=protected-access
REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
message,
mgmt_handler)
:param sequence_numbers: The seqeuence numbers of the scheduled messages.
:type sequence_numbers: int
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START cancel_scheduled_messages]
:end-before: [END cancel_scheduled_messages]
:language: python
:dedent: 4
:caption: Cancelling messages scheduled to be sent in future
"""
if not self.running:
self.open()
numbers = [types.AMQPLong(s) for s in sequence_numbers]
request_body = {'sequence-numbers': types.AMQPArray(numbers)}
return self._mgmt_request_response(
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
request_body,
mgmt_handlers.default)
:param session: If the entity requires sessions, a session ID must be supplied
in order that only messages from that session will be browsed. If the entity
does not require sessions this value will be ignored.
:type session: str
:rtype: list[~azure.servicebus.common.message.PeekMessage]
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START peek_messages_service_bus]
:end-before: [END peek_messages_service_bus]
:language: python
:dedent: 4
:caption: Look at specificied number of messages without removing them from queue
"""
message = {'from-sequence-number': types.AMQPLong(start_from), 'message-count': int(count)}
if self.entity and self.requires_session:
if not session:
raise ValueError("Sessions are required, please set session.")
message['session-id'] = session
with BaseHandler(self.entity_uri, self.auth_config, debug=self.debug, **kwargs) as handler:
return handler._mgmt_request_response( # pylint: disable=protected-access
REQUEST_RESPONSE_PEEK_OPERATION,
message,
mgmt_handlers.peek_op)