Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_io_receive_raises_socket_error(self):
connection = FakeConnection()
io = IO(connection.parameters)
io._exceptions = []
io.socket = MagicMock(name='socket', spec=socket.socket)
io.socket.recv.side_effect = socket.error('error')
io._receive()
self.assertIsInstance(io._exceptions[0], AMQPConnectionError)
def test_io_receive_raises_socket_timeout(self):
connection = FakeConnection()
io = IO(connection.parameters)
io.socket = MagicMock(name='socket', spec=socket.socket)
io.socket.recv.side_effect = socket.timeout('timeout')
io._receive()
def test_io_simple_send_zero_bytes_sent(self):
connection = FakeConnection()
io = IO(connection.parameters)
io._exceptions = []
io.socket = MagicMock(name='socket', spec=socket.socket)
io.poller = MagicMock(name='poller', spec=amqpstorm.io.Poller)
io.socket.send.return_value = 0
io.write_to_socket('afasffa')
self.assertIsInstance(io._exceptions[0], AMQPConnectionError)
def test_connection_fileno_property(self):
connection = Connection('127.0.0.1', 'guest', 'guest', lazy=True)
connection.set_state(connection.OPENING)
io = IO(connection.parameters, [])
io.socket = MagicMock(name='socket', spec=socket.socket)
connection._io = io
io.socket.fileno.return_value = 5
self.assertEqual(connection.fileno, 5)
def test_io_socket_close(self):
connection = FakeConnection()
io = IO(connection.parameters)
io.socket = MagicMock(name='socket', spec=socket.socket)
io.close()
self.assertIsNone(io.socket)
def test_io_simple_send_with_error(self):
connection = FakeConnection()
io = IO(connection.parameters)
io._exceptions = []
io.socket = MagicMock(name='socket', spec=socket.socket)
io.poller = MagicMock(name='poller', spec=amqpstorm.io.Poller)
io.socket.send.side_effect = socket.error('error')
io.write_to_socket('12345')
self.assertIsInstance(io._exceptions[0], AMQPConnectionError)
def test_io_get_socket_address(self):
connection = FakeConnection()
connection.parameters['hostname'] = '127.0.0.1'
connection.parameters['port'] = 5672
io = IO(connection.parameters)
addresses = io._get_socket_addresses()
sock_address_tuple = addresses[0]
self.assertEqual(sock_address_tuple[4],
('127.0.0.1', 5672))
def test_connection_wait_for_connection(self):
connection = Connection('127.0.0.1', 'guest', 'guest', timeout=5,
lazy=True)
connection.set_state(connection.OPENING)
io = IO(connection.parameters, [])
io.socket = MagicMock(name='socket', spec=socket.socket)
connection._io = io
self.assertFalse(connection.is_open)
def func(conn):
conn.set_state(conn.OPEN)
threading.Timer(function=func, interval=1, args=(connection,)).start()
connection._wait_for_connection_to_open()
self.assertTrue(connection.is_open)
def test_io_create_socket(self):
connection = FakeConnection()
io = IO(connection.parameters)
addresses = io._get_socket_addresses()
sock_address_tuple = addresses[0]
sock = io._create_socket(socket_family=sock_address_tuple[0])
if hasattr(socket, 'socket'):
self.assertIsInstance(sock, socket.socket)
elif hasattr(socket, '_socketobject'):
self.assertIsInstance(sock, socket._socketobject)
:return:
"""
super(Connection, self).__init__()
self.parameters = {
'hostname': hostname,
'username': username,
'password': password,
'port': port,
'virtual_host': kwargs.get('virtual_host', '/'),
'heartbeat': kwargs.get('heartbeat', 60),
'timeout': kwargs.get('timeout', 10),
'ssl': kwargs.get('ssl', False),
'ssl_options': kwargs.get('ssl_options', {})
}
self._validate_parameters()
self._io = IO(self.parameters, exceptions=self._exceptions,
on_read=self._read_buffer)
self._channel0 = Channel0(self)
self._channels = {}
self.heartbeat = Heartbeat(self.parameters['heartbeat'],
self._channel0.send_heartbeat)
if not kwargs.get('lazy', False):
self.open()