Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_message_auto_decode_enabled(self):
body = 'Hello World',
message = Message(body=body,
properties={'key': 'value',
'headers': {b'name': b'eandersson'}},
channel=None)
self.assertEqual(body, message.body)
self.assertIn('name', message.properties['headers'])
self.assertIn(b'name', message._properties['headers'])
self.assertIsInstance(message.properties['headers']['name'], str)
def test_message_update_property_with_decode(self):
message = Message(None, auto_decode=True)
message._update_properties('app_id', '123')
self.assertEqual(message.properties['app_id'], '123')
self.assertEqual(message._properties['app_id'], '123')
def test_message_dict(self):
body = b'Hello World'
properties = {'key': 'value'}
method = {b'alternative': 'value'}
message = Message(body=body,
properties=properties,
method=method,
channel=None)
result = dict(message)
self.assertIsInstance(result, dict)
self.assertEqual(result['body'], body)
self.assertEqual(result['properties'], properties)
self.assertEqual(result['method'], method)
channel = Channel(0, FakeConnection(), 360)
channel.set_state(Channel.OPEN)
message = b'Hello World!'
message_len = len(message)
deliver = specification.Basic.Deliver()
header = ContentHeader(body_size=message_len)
body = ContentBody(value=message)
channel._inbound = [deliver, header, body, deliver, header, body,
deliver, header, body, deliver, header, body]
index = 0
for message in channel.build_inbound_messages(break_on_empty=True):
self.assertIsInstance(message, Message)
index += 1
self.assertEqual(index, 4)
def test_message_auto_decode_when_properties_contains_dict(self):
prop_data = {
'hello': b'travis'
}
message = Message(body='Hello World',
properties={'key': prop_data},
channel=None)
self.assertIsInstance(message.properties['key'], dict)
self.assertEqual(prop_data['hello'].decode('utf-8'),
message.properties['key']['hello'])
def test_message_auto_decode_when_method_contains_list(self):
method_data = {'key': [b'a', b'b']}
message = Message(body='Hello World',
method=method_data,
channel=None)
self.assertEqual(method_data['key'][0].decode('utf-8'),
message.method['key'][0])
def test_message_auto_decode_when_method_is_none(self):
message = Message(body='Hello World',
method=None,
channel=None)
self.assertIsNone(message.method)
def test_channel_build_inbound_messages(self):
channel = Channel(0, FakeConnection(), 360)
channel.set_state(Channel.OPEN)
message = b'Hello World!'
message_len = len(message)
deliver = specification.Basic.Deliver()
header = ContentHeader(body_size=message_len)
body = ContentBody(value=message)
channel._inbound = [deliver, header, body]
for message in channel.build_inbound_messages(break_on_empty=True):
self.assertIsInstance(message, Message)
def publish_message():
with Connection('127.0.0.1', 'guest', 'guest') as connection:
with connection.channel() as channel:
# Declare the Queue, 'simple_queue'.
channel.queue.declare('simple_queue')
# Message Properties.
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
}
# Create the message.
message = Message.create(channel, 'Hello World!', properties)
# Publish the message to a queue called, 'simple_queue'.
message.publish('simple_queue')
def send_request(self, payload):
# Create the Message object.
message = Message.create(self.channel, payload)
message.reply_to = self.callback_queue
# Create an entry in our local dictionary, using the automatically
# generated correlation_id as our key.
self.queue[message.correlation_id] = None
# Publish the RPC request.
message.publish(routing_key=self.rpc_queue)
# Return the Unique ID used to identify the request.
return message.correlation_id