Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_stateful_set_closed(self):
stateful = Stateful()
stateful.set_state(Stateful.CLOSED)
self.assertTrue(stateful.is_closed)
def test_stateful_set_open(self):
stateful = Stateful()
stateful.set_state(Stateful.OPEN)
self.assertTrue(stateful.is_open)
def test_stateful_set_opening(self):
stateful = Stateful()
stateful.set_state(Stateful.OPENING)
self.assertTrue(stateful.is_opening)
__author__ = 'eandersson'
from amqpstorm import compatibility
from amqpstorm.base import Stateful
from amqpstorm.exception import AMQPInvalidArgument
class FakeConnection(Stateful):
"""Fake Connection for Unit-Testing."""
frames_out = []
parameters = {
'hostname': 'localhost',
'port': 1234,
'heartbeat': 60,
'timeout': 30,
'ssl': False
}
def __init__(self, state=3):
super(FakeConnection, self).__init__()
self.set_state(state)
def write_frame(self, channel_id, frame_out):
self.frames_out.append((channel_id, frame_out))
def test_stateful_default_is_closed(self):
stateful = Stateful()
self.assertTrue(stateful.is_closed)
def _wait_for_connection_state(self, state=Stateful.OPEN, rpc_timeout=30):
"""Wait for a Connection state.
:param int state: State that we expect
:raises AMQPConnectionError: Raises if we are unable to establish
a connection to RabbitMQ.
:return:
"""
start_time = time.time()
while self.current_state != state:
self.check_for_errors()
if time.time() - start_time > rpc_timeout:
raise AMQPConnectionError('Connection timed out')
sleep(IDLE_WAIT)
def close(self):
"""Close connection.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:return:
"""
LOGGER.debug('Connection Closing')
if not self.is_closed:
self.set_state(self.CLOSING)
self.heartbeat.stop()
try:
if not self.is_closed and self.socket:
self._channel0.send_close_connection()
self._wait_for_connection_state(state=Stateful.CLOSED)
except AMQPConnectionError:
pass
finally:
self._close_remaining_channels()
self._io.close()
self.set_state(self.CLOSED)
LOGGER.debug('Connection Closed')
def _close_connection_ok(self):
"""Connection CloseOk frame received.
:return:
"""
self._set_connection_state(Stateful.CLOSED)
:param frame_in: Amqp frame.
:return:
"""
LOGGER.debug('Frame Received: %s', frame_in.name)
if frame_in.name == 'Heartbeat':
return
elif frame_in.name == 'Connection.Close':
self._close_connection(frame_in)
elif frame_in.name == 'Connection.CloseOk':
self._close_connection_ok()
elif frame_in.name == 'Connection.Blocked':
self._blocked_connection(frame_in)
elif frame_in.name == 'Connection.Unblocked':
self._unblocked_connection()
elif frame_in.name == 'Connection.OpenOk':
self._set_connection_state(Stateful.OPEN)
elif frame_in.name == 'Connection.Start':
self.server_properties = frame_in.server_properties
self._send_start_ok(frame_in)
elif frame_in.name == 'Connection.Tune':
self._send_tune_ok(frame_in)
self._send_open_connection()
else:
LOGGER.error('[Channel0] Unhandled Frame: %s', frame_in.name)
from amqpstorm import compatibility
from amqpstorm.base import IDLE_WAIT
from amqpstorm.base import Stateful
from amqpstorm.channel import Channel
from amqpstorm.channel0 import Channel0
from amqpstorm.exception import AMQPConnectionError
from amqpstorm.exception import AMQPInvalidArgument
from amqpstorm.heartbeat import Heartbeat
from amqpstorm.io import EMPTY_BUFFER
from amqpstorm.io import IO
LOGGER = logging.getLogger(__name__)
class Connection(Stateful):
"""RabbitMQ Connection."""
__slots__ = [
'heartbeat', 'parameters', '_channel0', '_channels', '_io'
]
def __init__(self, hostname, username, password, port=5672, **kwargs):
"""
:param str hostname: Hostname
:param str username: Username
:param str password: Password
:param int port: Server port
:param str virtual_host: Virtual host
:param int heartbeat: RabbitMQ Heartbeat interval
:param int|float timeout: Socket timeout
:param bool ssl: Enable SSL
:param dict ssl_options: SSL kwargs (from ssl.wrap_socket)