Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_channel_build_empty_inbound_messages(self):
channel = Channel(0, FakeConnection(), 360)
channel.set_state(Channel.OPEN)
result = None
for message in channel.build_inbound_messages(break_on_empty=True):
result = message
self.assertIsNone(result)
def test_channel_unhandled_frame(self):
connection = amqpstorm.Connection('localhost', 'guest', 'guest',
lazy=True)
channel = Channel(0, connection, rpc_timeout=360)
channel.on_frame(FakeFrame())
def test_channel_cancel_ok_frame(self):
tag = 'hello-world'
channel = Channel(0, None, rpc_timeout=360)
channel.add_consumer_tag(tag)
channel.on_frame(specification.Basic.CancelOk(tag))
self.assertFalse(channel.consumer_tags)
def test_chanel_callback_not_set(self):
channel = Channel(0, None, 360)
self.assertRaisesRegexp(AMQPChannelError,
'no consumer_callback defined',
channel.process_data_events)
def test_channel_basic_return_frame(self):
connection = amqpstorm.Connection('localhost', 'guest', 'guest',
lazy=True)
channel = Channel(0, connection, rpc_timeout=360)
channel.on_frame(specification.Basic.Return(reply_code=500,
reply_text='test',
exchange='exchange',
routing_key='routing_key'))
self.assertEqual(str(channel.exceptions[0]),
"Message not delivered: test (500) to queue "
"'routing_key' from exchange 'exchange'")
def test_channel_check_error_no_exception(self):
channel = Channel(0, FakeConnection(), 360)
channel.set_state(Channel.OPEN)
channel.check_for_errors()
def test_channel_build_message(self):
channel = Channel(0, None, 360)
message = b'Hello World!'
message_len = len(message)
deliver = specification.Basic.Deliver()
header = ContentHeader(body_size=message_len)
body = ContentBody(value=message)
channel._inbound = [deliver, header, body]
result = channel._build_message()
self.assertEqual(result._body, message)
def test_channel_id(self):
channel = Channel(0, None, 360)
self.assertEqual(int(channel), 0)
channel = Channel(1557, None, 360)
self.assertEqual(int(channel), 1557)
def test_channel_consume_ok_frame(self):
tag = 'hello-world'
channel = Channel(0, None, rpc_timeout=360)
channel.on_frame(specification.Basic.ConsumeOk(tag))
self.assertEqual(channel.consumer_tags[0], tag)
def test_channel_throw_exception_check_for_error(self):
channel = Channel(0, FakeConnection(), 360)
channel.set_state(channel.OPEN)
channel.exceptions.append(AMQPConnectionError('Test'))
self.assertRaises(AMQPConnectionError, channel.check_for_errors)