Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_channel_closed_after_connection_exception(self):
connection = amqpstorm.Connection('localhost', 'guest', 'guest',
lazy=True)
channel = Channel(0, connection, 360)
connection.exceptions.append(AMQPConnectionError('error'))
channel.set_state(channel.OPEN)
self.assertTrue(connection.is_closed)
self.assertTrue(channel.is_open)
self.assertRaisesRegexp(exception.AMQPConnectionError, 'error',
channel.check_for_errors)
self.assertTrue(channel.is_closed)
def test_connection_close_state(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
connection.set_state(Connection.OPEN)
connection.close()
self.assertTrue(connection.is_closed)
def test_connection_heartbeat_stopped_on_close(self):
connection = Connection('127.0.0.1', 'guest', 'guest', timeout=1,
lazy=True)
connection.set_state(connection.OPEN)
connection.heartbeat.start(connection.exceptions)
connection.exceptions.append(AMQPConnectionError('error'))
self.assertFalse(connection.heartbeat._stopped.is_set())
self.assertRaises(AMQPConnectionError, connection.check_for_errors)
self.assertTrue(connection.heartbeat._stopped.is_set())
def test_channel0_on_close_frame(self):
connection = amqpstorm.Connection('localhost', 'guest', 'guest',
lazy=True)
connection.set_state(connection.OPEN)
channel = Channel0(connection)
self.assertFalse(connection.exceptions)
channel.on_frame(Connection.Close())
self.assertTrue(connection.exceptions)
self.assertTrue(connection.is_closed)
self.assertRaisesRegexp(AMQPConnectionError,
'Connection was closed by remote server: ',
connection.check_for_errors)
def test_channel_basic_return_frame(self):
connection = amqpstorm.Connection('localhost', 'guest', 'guest',
lazy=True)
channel = Channel(0, connection, rpc_timeout=360)
channel.on_frame(specification.Basic.Return(reply_code=500,
reply_text='test',
exchange='exchange',
routing_key='routing_key'))
self.assertEqual(str(channel.exceptions[0]),
"Message not delivered: test (500) to queue "
"'routing_key' from exchange 'exchange'")
def test_heartbeat_raise_when_check_for_life_when_exceptions_not_set(self):
heartbeat = Heartbeat(1)
heartbeat._beats_since_check = 0
heartbeat._last_heartbeat = time.time() - 100
# Normally the exception should be passed down to the list of
# exceptions in the connection, but if that list for some obscure
# reason is None, we should raise directly in _check_for_life_signs.
self.assertRaises(AMQPConnectionError, heartbeat._check_for_life_signs)
def ack(self, delivery_tag=None, multiple=False):
if delivery_tag is not None \
and not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer '
'or None')
elif not isinstance(multiple, bool):
raise AMQPInvalidArgument('multiple should be a boolean')
self.channel.result.append((delivery_tag, multiple))
def test_io_default_ssl_version(self):
if hasattr(ssl, 'PROTOCOL_TLSv1_2'):
self.assertEqual(compatibility.DEFAULT_SSL_VERSION,
ssl.PROTOCOL_TLSv1_2)
else:
self.assertEqual(compatibility.DEFAULT_SSL_VERSION,
ssl.PROTOCOL_TLSv1)
def test_compatibility_long_integer(self):
x = long(1)
self.assertTrue(compatibility.is_integer(x))
def reject(self, delivery_tag=None, requeue=True):
if delivery_tag is not None \
and not compatibility.is_integer(delivery_tag):
raise AMQPInvalidArgument('delivery_tag should be an integer '
'or None')
elif not isinstance(requeue, bool):
raise AMQPInvalidArgument('requeue should be a boolean')
self.channel.result.append((delivery_tag, requeue))