Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(passive, bool):
raise AMQPInvalidArgument('passive should be a boolean')
elif not isinstance(durable, bool):
raise AMQPInvalidArgument('durable should be a boolean')
elif not isinstance(exclusive, bool):
raise AMQPInvalidArgument('exclusive should be a boolean')
elif not isinstance(auto_delete, bool):
raise AMQPInvalidArgument('auto_delete should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
declare_frame = pamqp_queue.Declare(queue=queue,
passive=passive,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
arguments=arguments)
return self._channel.rpc_request(declare_frame)
"""Validate Connection Parameters.
:return:
"""
if not compatibility.is_string(self.parameters['hostname']):
raise AMQPInvalidArgument('hostname should be a string')
elif not compatibility.is_integer(self.parameters['port']):
raise AMQPInvalidArgument('port should be an integer')
elif not compatibility.is_string(self.parameters['username']):
raise AMQPInvalidArgument('username should be a string')
elif not compatibility.is_string(self.parameters['password']):
raise AMQPInvalidArgument('password should be a string')
elif not compatibility.is_string(self.parameters['virtual_host']):
raise AMQPInvalidArgument('virtual_host should be a string')
elif not isinstance(self.parameters['timeout'], (int, float)):
raise AMQPInvalidArgument('timeout should be an integer or float')
elif not compatibility.is_integer(self.parameters['heartbeat']):
raise AMQPInvalidArgument('heartbeat should be an integer')
def cancel(self, consumer_tag=''):
"""Cancel a queue consumer.
:param str consumer_tag: Consumer tag
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(consumer_tag):
raise AMQPInvalidArgument('consumer_tag should be a string')
cancel_frame = specification.Basic.Cancel(consumer_tag=consumer_tag)
result = self._channel.rpc_request(cancel_frame)
self._channel.remove_consumer_tag(consumer_tag)
return result
def ack(self, delivery_tag=None, multiple=False):
"""Acknowledge Message.
:param int/long delivery_tag: Server-assigned delivery tag
:param bool multiple: Acknowledge multiple messages
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if delivery_tag is not None \
and not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer '
'or None')
elif not isinstance(multiple, bool):
raise AMQPInvalidArgument('multiple should be a boolean')
ack_frame = specification.Basic.Ack(delivery_tag=delivery_tag,
multiple=multiple)
self._channel.write_frame(ack_frame)
"""Close Channel.
:param int reply_code: Close reply code (e.g. 200)
:param str reply_text: Close reply text
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(reply_code):
raise AMQPInvalidArgument('reply_code should be an integer')
elif not compatibility.is_string(reply_text):
raise AMQPInvalidArgument('reply_text should be a string')
try:
if self._connection.is_closed or self.is_closed:
self.stop_consuming()
LOGGER.debug('Channel #%d forcefully Closed', self.channel_id)
return
self.set_state(self.CLOSING)
LOGGER.debug('Channel #%d Closing', self.channel_id)
try:
self.stop_consuming()
except AMQPChannelError:
self.remove_consumer_tag()
self.rpc_request(specification.Channel.Close(
reply_code=reply_code,
reply_text=reply_text),
connection_adapter=self._connection
)
:return:
"""
if not compatibility.is_string(self.parameters['hostname']):
raise AMQPInvalidArgument('hostname should be a string')
elif not compatibility.is_integer(self.parameters['port']):
raise AMQPInvalidArgument('port should be an integer')
elif not compatibility.is_string(self.parameters['username']):
raise AMQPInvalidArgument('username should be a string')
elif not compatibility.is_string(self.parameters['password']):
raise AMQPInvalidArgument('password should be a string')
elif not compatibility.is_string(self.parameters['virtual_host']):
raise AMQPInvalidArgument('virtual_host should be a string')
elif not isinstance(self.parameters['timeout'], (int, float)):
raise AMQPInvalidArgument('timeout should be an integer or float')
elif not compatibility.is_integer(self.parameters['heartbeat']):
raise AMQPInvalidArgument('heartbeat should be an integer')
:param bool auto_decode: Auto-decode strings when possible.
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:returns: Returns a single message, as long as there is a message in
the queue. If no message is available, returns None.
:rtype: dict|Message|None
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(no_ack, bool):
raise AMQPInvalidArgument('no_ack should be a boolean')
elif self._channel.consumer_tags:
raise AMQPChannelError("Cannot call 'get' when channel is "
"set to consume")
get_frame = specification.Basic.Get(queue=queue,
no_ack=no_ack)
with self._channel.lock and self._channel.rpc.lock:
message = self._get_message(get_frame, auto_decode=auto_decode)
if message and to_dict:
return message.to_dict()
return message
def _validate_parameters(self):
"""Validate Connection Parameters.
:return:
"""
if not compatibility.is_string(self.parameters['hostname']):
raise AMQPInvalidArgument('hostname should be a string')
elif not compatibility.is_integer(self.parameters['port']):
raise AMQPInvalidArgument('port should be an integer')
elif not compatibility.is_string(self.parameters['username']):
raise AMQPInvalidArgument('username should be a string')
elif not compatibility.is_string(self.parameters['password']):
raise AMQPInvalidArgument('password should be a string')
elif not compatibility.is_string(self.parameters['virtual_host']):
raise AMQPInvalidArgument('virtual_host should be a string')
elif not isinstance(self.parameters['timeout'], (int, float)):
raise AMQPInvalidArgument('timeout should be an integer or float')
elif not compatibility.is_integer(self.parameters['heartbeat']):
raise AMQPInvalidArgument('heartbeat should be an integer')
def _validate_parameters(self):
"""Validate Connection Parameters.
:return:
"""
if not compatibility.is_string(self.parameters['hostname']):
raise AMQPInvalidArgument('hostname should be a string')
elif not compatibility.is_integer(self.parameters['port']):
raise AMQPInvalidArgument('port should be an integer')
elif not compatibility.is_string(self.parameters['username']):
raise AMQPInvalidArgument('username should be a string')
elif not compatibility.is_string(self.parameters['password']):
raise AMQPInvalidArgument('password should be a string')
elif not compatibility.is_string(self.parameters['virtual_host']):
raise AMQPInvalidArgument('virtual_host should be a string')
elif not isinstance(self.parameters['timeout'], (int, float)):
raise AMQPInvalidArgument('timeout should be an integer or float')
elif not compatibility.is_integer(self.parameters['heartbeat']):
raise AMQPInvalidArgument('heartbeat should be an integer')
:param bool if_unused: Delete only if unused
:param bool if_empty: Delete only if empty
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(if_unused, bool):
raise AMQPInvalidArgument('if_unused should be a boolean')
elif not isinstance(if_empty, bool):
raise AMQPInvalidArgument('if_empty should be a boolean')
delete_frame = pamqp_queue.Delete(queue=queue, if_unused=if_unused,
if_empty=if_empty)
return self._channel.rpc_request(delete_frame)