Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_channel_closed_after_connection_exception(self):
connection = amqpstorm.Connection('localhost', 'guest', 'guest',
lazy=True)
channel = Channel(0, connection, 360)
connection.exceptions.append(AMQPConnectionError('error'))
channel.set_state(channel.OPEN)
self.assertTrue(connection.is_closed)
self.assertTrue(channel.is_open)
self.assertRaisesRegexp(exception.AMQPConnectionError, 'error',
channel.check_for_errors)
self.assertTrue(channel.is_closed)
def test_connection_close_state(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
connection.set_state(Connection.OPEN)
connection.close()
self.assertTrue(connection.is_closed)
def test_connection_heartbeat_stopped_on_close(self):
connection = Connection('127.0.0.1', 'guest', 'guest', timeout=1,
lazy=True)
connection.set_state(connection.OPEN)
connection.heartbeat.start(connection.exceptions)
connection.exceptions.append(AMQPConnectionError('error'))
self.assertFalse(connection.heartbeat._stopped.is_set())
self.assertRaises(AMQPConnectionError, connection.check_for_errors)
self.assertTrue(connection.heartbeat._stopped.is_set())
def test_channel0_on_close_frame(self):
connection = amqpstorm.Connection('localhost', 'guest', 'guest',
lazy=True)
connection.set_state(connection.OPEN)
channel = Channel0(connection)
self.assertFalse(connection.exceptions)
channel.on_frame(Connection.Close())
self.assertTrue(connection.exceptions)
self.assertTrue(connection.is_closed)
self.assertRaisesRegexp(AMQPConnectionError,
'Connection was closed by remote server: ',
connection.check_for_errors)
def test_channel_basic_return_frame(self):
connection = amqpstorm.Connection('localhost', 'guest', 'guest',
lazy=True)
channel = Channel(0, connection, rpc_timeout=360)
channel.on_frame(specification.Basic.Return(reply_code=500,
reply_text='test',
exchange='exchange',
routing_key='routing_key'))
self.assertEqual(str(channel.exceptions[0]),
"Message not delivered: test (500) to queue "
"'routing_key' from exchange 'exchange'")
def publish_message():
with Connection('127.0.0.1', 'guest', 'guest') as connection:
with connection.channel() as channel:
# Declare the Queue, 'simple_queue'.
channel.queue.declare('simple_queue')
# Message Properties.
properties = {
'content_type': 'text/plain',
'headers': {'key': 'value'}
}
# Create the message.
message = Message.create(channel, 'Hello World!', properties)
# Publish the message to a queue called, 'simple_queue'.
message.publish('simple_queue')
def consumer():
connection = Connection(HOST, USERNAME, PASSWORD, ssl=True, port=5671,
ssl_options={
'ssl_version': ssl.PROTOCOL_TLSv1,
'cert_reqs': ssl.CERT_NONE
})
channel = connection.channel()
channel.queue.declare('simple_queue')
channel.basic.publish(body='Hello World!', routing_key='simple_queue')
channel.close()
connection.close()
def consumer():
connection = Connection(HOST, USERNAME, PASSWORD)
channel = connection.channel()
# Declare a queue.
channel.queue.declare(QUEUE_NAME)
# Publish something we can get.
channel.basic.publish(body='Hello World!', routing_key=QUEUE_NAME,
properties={'content_type': 'text/plain'})
# Retrieve a single message.
result = channel.basic.get(queue=QUEUE_NAME, no_ack=False)
if result:
# If we got a message, handle it.
print('Message:', result['body'])
# Mark the message as handle.
"""
RPC Server example based on code from the official RabbitMQ Tutorial.
http://www.rabbitmq.com/tutorials/tutorial-six-python.html
"""
import amqpstorm
from amqpstorm import Message
CONNECTION = amqpstorm.Connection('127.0.0.1', 'guest', 'guest')
CHANNEL = CONNECTION.channel()
CHANNEL.queue.declare(queue='rpc_queue')
def fib(number):
if number == 0:
return 0
elif number == 1:
return 1
else:
return fib(number - 1) + fib(number - 2)
def on_request(message):
number = int(message.body)
def _create_connection(self):
"""Create a connection.
:return:
"""
attempts = 0
while True:
attempts += 1
if self._stopped.is_set():
break
try:
self._connection = Connection(self.hostname,
self.username,
self.password)
break
except amqpstorm.AMQPError as why:
LOGGER.warning(why)
if self.max_retries and attempts > self.max_retries:
raise Exception('max number of retries reached')
time.sleep(min(attempts * 2, 30))
except KeyboardInterrupt:
break