Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_compatibility_long_integer(self):
x = long(1)
self.assertTrue(compatibility.is_integer(x))
def reject(self, delivery_tag=None, requeue=True):
if delivery_tag is not None \
and not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer '
'or None')
elif not isinstance(requeue, bool):
raise AMQPInvalidArgument('requeue should be a boolean')
self.channel.result.append((delivery_tag, requeue))
def close(self, reply_code=200, reply_text=''):
"""Close Channel.
:param int reply_code: Close reply code (e.g. 200)
:param str reply_text: Close reply text
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(reply_code):
raise AMQPInvalidArgument('reply_code should be an integer')
elif not compatibility.is_string(reply_text):
raise AMQPInvalidArgument('reply_text should be a string')
try:
if self._connection.is_closed or self.is_closed:
self.stop_consuming()
LOGGER.debug('Channel #%d forcefully Closed', self.channel_id)
return
self.set_state(self.CLOSING)
LOGGER.debug('Channel #%d Closing', self.channel_id)
try:
self.stop_consuming()
except AMQPChannelError:
self.remove_consumer_tag()
self.rpc_request(specification.Channel.Close(
reply_code=reply_code,
def ack(self, delivery_tag=0, multiple=False):
"""Acknowledge Message.
:param int/long delivery_tag: Server-assigned delivery tag
:param bool multiple: Acknowledge multiple messages
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer')
elif not isinstance(multiple, bool):
raise AMQPInvalidArgument('multiple should be a boolean')
ack_frame = specification.Basic.Ack(delivery_tag=delivery_tag,
multiple=multiple)
self._channel.write_frame(ack_frame)
def channel(self, rpc_timeout=60, lazy=False):
"""Open Channel.
:param int rpc_timeout: Timeout before we give up waiting for an RPC
response from the server.
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
"""
LOGGER.debug('Opening a new Channel')
if not compatibility.is_integer(rpc_timeout):
raise AMQPInvalidArgument('rpc_timeout should be an integer')
elif self.is_closed:
raise AMQPConnectionError('socket/connection closed')
with self.lock:
channel_id = self._get_next_available_channel_id()
channel = Channel(channel_id, self, rpc_timeout,
on_close_impl=self._cleanup_channel)
self._channels[channel_id] = channel
if not lazy:
channel.open()
LOGGER.debug('Channel #%d Opened', channel_id)
return self._channels[channel_id]
def reject(self, delivery_tag=0, requeue=True):
"""Reject Message.
:param int/long delivery_tag: Server-assigned delivery tag
:param bool requeue: Re-queue the message
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer')
elif not isinstance(requeue, bool):
raise AMQPInvalidArgument('requeue should be a boolean')
reject_frame = specification.Basic.Reject(delivery_tag=delivery_tag,
requeue=requeue)
self._channel.write_frame(reject_frame)
def qos(self, prefetch_count=0, prefetch_size=0, global_=False):
"""Specify quality of service.
:param int prefetch_count: Prefetch window in messages
:param int/long prefetch_size: Prefetch window in octets
:param bool global_: Apply to entire connection
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_integer(prefetch_count):
raise AMQPInvalidArgument('prefetch_count should be an integer')
elif not compatibility.is_integer(prefetch_size):
raise AMQPInvalidArgument('prefetch_size should be an integer')
elif not isinstance(global_, bool):
raise AMQPInvalidArgument('global_ should be a boolean')
qos_frame = specification.Basic.Qos(prefetch_count=prefetch_count,
prefetch_size=prefetch_size,
global_=global_)
return self._channel.rpc_request(qos_frame)
def close(self, reply_code=200, reply_text=''):
"""Close Channel.
:param int reply_code: Close reply code (e.g. 200)
:param str reply_text: Close reply text
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
if not compatibility.is_integer(reply_code):
raise AMQPInvalidArgument('reply_code should be an integer')
elif not compatibility.is_string(reply_text):
raise AMQPInvalidArgument('reply_text should be a string')
try:
if self._connection.is_closed or self.is_closed:
self.stop_consuming()
LOGGER.debug('Channel #%d forcefully Closed', self.channel_id)
return
self.set_state(self.CLOSING)
LOGGER.debug('Channel #%d Closing', self.channel_id)
try:
self.stop_consuming()
except AMQPChannelError:
self.remove_consumer_tag()
self.rpc_request(specification.Channel.Close(
reply_code=reply_code,