Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param str destination: Exchange name
:param str source: Exchange to unbind from
:param str routing_key: The routing key used
:param dict arguments: Unbind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(destination):
raise AMQPInvalidArgument('destination should be a string')
elif not compatibility.is_string(source):
raise AMQPInvalidArgument('source should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
unbind_frame = pamqp_exchange.Unbind(destination=destination,
source=source,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(unbind_frame)
def _validate_publish_parameters(body, exchange, immediate, mandatory,
properties, routing_key):
"""Validate Publish Parameters.
:param bytes|str|unicode body: Message payload
:param str routing_key: Message routing key
:param str exchange: The exchange to publish the message to
:param dict properties: Message properties
:param bool mandatory: Requires the message is published
:param bool immediate: Request immediate delivery
:raises AMQPInvalidArgument: Invalid Parameters
:return:
"""
if not compatibility.is_string(body):
raise AMQPInvalidArgument('body should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif properties is not None and not isinstance(properties, dict):
raise AMQPInvalidArgument('properties should be a dict or None')
elif not isinstance(mandatory, bool):
raise AMQPInvalidArgument('mandatory should be a boolean')
elif not isinstance(immediate, bool):
raise AMQPInvalidArgument('immediate should be a boolean')
:param str queue: Queue name
:param str exchange: Exchange name
:param str routing_key: The routing key to use
:param dict arguments: Bind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
bind_frame = pamqp_queue.Bind(queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(bind_frame)
"""Validate Publish Parameters.
:param bytes|str|unicode body: Message payload
:param str routing_key: Message routing key
:param str exchange: The exchange to publish the message to
:param dict properties: Message properties
:param bool mandatory: Requires the message is published
:param bool immediate: Request immediate delivery
:raises AMQPInvalidArgument: Invalid Parameters
:return:
"""
if not compatibility.is_string(body):
raise AMQPInvalidArgument('body should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif properties is not None and not isinstance(properties, dict):
raise AMQPInvalidArgument('properties should be a dict or None')
elif not isinstance(mandatory, bool):
raise AMQPInvalidArgument('mandatory should be a boolean')
elif not isinstance(immediate, bool):
raise AMQPInvalidArgument('immediate should be a boolean')
:param bytes|str|unicode body: Message payload
:param str routing_key: Message routing key
:param str exchange: The exchange to publish the message to
:param dict properties: Message properties
:param bool mandatory: Requires the message is published
:param bool immediate: Request immediate delivery
:raises AMQPInvalidArgument: Invalid Parameters
:return:
"""
if not compatibility.is_string(body):
raise AMQPInvalidArgument('body should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif properties is not None and not isinstance(properties, dict):
raise AMQPInvalidArgument('properties should be a dict or None')
elif not isinstance(mandatory, bool):
raise AMQPInvalidArgument('mandatory should be a boolean')
elif not isinstance(immediate, bool):
raise AMQPInvalidArgument('immediate should be a boolean')
:param str source: Exchange to unbind from
:param str routing_key: The routing key used
:param dict arguments: Unbind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(destination):
raise AMQPInvalidArgument('destination should be a string')
elif not compatibility.is_string(source):
raise AMQPInvalidArgument('source should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
unbind_frame = pamqp_exchange.Unbind(destination=destination,
source=source,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(unbind_frame)
:param str queue: Queue name
:param str consumer_tag: Consumer tag
:param bool no_local: Do not deliver own messages
:param bool no_ack: No acknowledgement needed
:param bool exclusive: Request exclusive access
:param dict arguments: Consume key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:returns: Consumer tag
:rtype: str
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(consumer_tag):
raise AMQPInvalidArgument('consumer_tag should be a string')
elif not isinstance(exclusive, bool):
raise AMQPInvalidArgument('exclusive should be a boolean')
elif not isinstance(no_ack, bool):
raise AMQPInvalidArgument('no_ack should be a boolean')
elif not isinstance(no_local, bool):
raise AMQPInvalidArgument('no_local should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
self._channel.consumer_callback = callback
consume_rpc_result = self._consume_rpc_request(arguments, consumer_tag,
exclusive, no_ack,
no_local, queue)
return self._consume_add_and_get_tag(consume_rpc_result)
def _validate_parameters(self):
"""Validate Connection Parameters.
:return:
"""
if not compatibility.is_string(self.parameters['hostname']):
raise AMQPInvalidArgument('hostname should be a string')
elif not compatibility.is_integer(self.parameters['port']):
raise AMQPInvalidArgument('port should be an integer')
elif not compatibility.is_string(self.parameters['username']):
raise AMQPInvalidArgument('username should be a string')
elif not compatibility.is_string(self.parameters['password']):
raise AMQPInvalidArgument('password should be a string')
elif not compatibility.is_string(self.parameters['virtual_host']):
raise AMQPInvalidArgument('virtual_host should be a string')
elif not isinstance(self.parameters['timeout'], (int, float)):
raise AMQPInvalidArgument('timeout should be an integer or float')
elif not compatibility.is_integer(self.parameters['heartbeat']):
raise AMQPInvalidArgument('heartbeat should be an integer')
:param bool no_local: Do not deliver own messages
:param bool no_ack: No acknowledgement needed
:param bool exclusive: Request exclusive access
:param dict arguments: Consume key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:returns: Consumer tag
:rtype: str
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(consumer_tag):
raise AMQPInvalidArgument('consumer_tag should be a string')
elif not isinstance(exclusive, bool):
raise AMQPInvalidArgument('exclusive should be a boolean')
elif not isinstance(no_ack, bool):
raise AMQPInvalidArgument('no_ack should be a boolean')
elif not isinstance(no_local, bool):
raise AMQPInvalidArgument('no_local should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
consume_rpc_result = self._consume_rpc_request(arguments, consumer_tag,
exclusive, no_ack,
no_local, queue)
tag = self._consume_add_and_get_tag(consume_rpc_result)
self._channel._consumer_callbacks[tag] = callback
return tag