Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param sequence_numbers: The seqeuence numbers of the scheduled messages.
:type sequence_numbers: int
Example:
.. literalinclude:: ../examples/test_examples.py
:start-after: [START cancel_scheduled_messages]
:end-before: [END cancel_scheduled_messages]
:language: python
:dedent: 4
:caption: Cancelling messages scheduled to be sent in future
"""
if not self.running:
self.open()
numbers = [types.AMQPLong(s) for s in sequence_numbers]
request_body = {'sequence-numbers': types.AMQPArray(numbers)}
return self._mgmt_request_response(
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
request_body,
mgmt_handlers.default)
def _create_handler(self, auth: "JWTTokenAsync") -> None:
source = Source(self._source)
if self._offset is not None:
source.set_filter(
event_position_selector(self._offset, self._offset_inclusive)
)
desired_capabilities = None
if self._track_last_enqueued_event_properties:
symbol_array = [types.AMQPSymbol(RECEIVER_RUNTIME_METRIC_SYMBOL)]
desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))
properties = create_properties(
self._client._config.user_agent # pylint:disable=protected-access
)
self._handler = ReceiveClientAsync(
source,
auth=auth,
debug=self._client._config.network_tracing, # pylint:disable=protected-access
prefetch=self._prefetch,
link_properties=self._link_properties,
timeout=self._timeout,
idle_timeout=self._idle_timeout,
error_policy=self._retry_policy,
keep_alive_interval=self._keep_alive,
client_name=self._name,
receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete,
def _create_handler(self, auth):
# type: (JWTTokenAuth) -> None
source = Source(self._source)
if self._offset is not None:
source.set_filter(
event_position_selector(self._offset, self._offset_inclusive)
)
desired_capabilities = None
if self._track_last_enqueued_event_properties:
symbol_array = [types.AMQPSymbol(RECEIVER_RUNTIME_METRIC_SYMBOL)]
desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))
properties = create_properties(
self._client._config.user_agent # pylint:disable=protected-access
)
self._handler = ReceiveClient(
source,
auth=auth,
debug=self._client._config.network_tracing, # pylint:disable=protected-access
prefetch=self._prefetch,
link_properties=self._link_properties,
timeout=self._timeout,
idle_timeout=self._idle_timeout,
error_policy=self._retry_policy,
keep_alive_interval=self._keep_alive,
client_name=self._name,
receive_settle_mode=uamqp.constants.ReceiverSettleMode.ReceiveAndDelete,
:start-after: [START receiver_defer_messages]
:end-before: [END receiver_defer_messages]
:language: python
:dedent: 8
:caption: Defer messages, then retrieve them by sequence number.
"""
if not sequence_numbers:
raise ValueError("At least one sequence number must be specified.")
await self._can_run()
try:
receive_mode = mode.value.value
except AttributeError:
receive_mode = int(mode)
message = {
'sequence-numbers': types.AMQPArray([types.AMQPLong(s) for s in sequence_numbers]),
'receiver-settle-mode': types.AMQPuInt(receive_mode)
}
handler = functools.partial(mgmt_handlers.deferred_message_op, mode=receive_mode, message_type=DeferredMessage)
messages = await self._mgmt_request_response(
REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
message,
handler)
for m in messages:
m._receiver = self # pylint: disable=protected-access
return messages
async def _settle_deferred(self, settlement, lock_tokens, dead_letter_details=None):
message = {
'disposition-status': settlement,
'lock-tokens': types.AMQPArray(lock_tokens)}
if dead_letter_details:
message.update(dead_letter_details)
return await self._mgmt_request_response(
REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION,
message,
mgmt_handlers.default)
:param sequence_numbers: The seqeuence numbers of the scheduled messages.
:type sequence_numbers: int
Example:
.. literalinclude:: ../examples/async_examples/test_examples_async.py
:start-after: [START cancel_schedule_messages]
:end-before: [END cancel_schedule_messages]
:language: python
:dedent: 4
:caption: Schedule messages.
"""
if not self.running:
await self.open()
numbers = [types.AMQPLong(s) for s in sequence_numbers]
request_body = {'sequence-numbers': types.AMQPArray(numbers)}
return await self._mgmt_request_response(
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
request_body,
mgmt_handlers.default)
:start-after: [START client_settle_deferred_messages]
:end-before: [END client_settle_deferred_messages]
:language: python
:dedent: 4
:caption: Settle deferred messages.
"""
if (self.entity and self.requires_session) or kwargs.get('session'):
raise ValueError("Sessionful deferred messages can only be settled within a locked receive session.")
if settlement.lower() not in ['completed', 'suspended', 'abandoned']:
raise ValueError("Settlement must be one of: 'completed', 'suspended', 'abandoned'")
if not messages:
raise ValueError("At least one message must be specified.")
message = {
'disposition-status': settlement.lower(),
'lock-tokens': types.AMQPArray([m.lock_token for m in messages])}
async with BaseHandler(
self.entity_uri, self.auth_config, loop=self.loop, debug=self.debug, **kwargs) as handler:
return await handler._mgmt_request_response( # pylint: disable=protected-access
REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION,
message,
mgmt_handlers.default)