How to use the pysoa.test.compatibility.mock function in pysoa

To help you get started, we’ve selected a few pysoa examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github eventbrite / pysoa / tests / unit / common / transport / redis_gateway / test_threading.py View on Github external
    @mock.patch(target='pysoa.common.transport.redis_gateway.client.get_hex_thread_id')
    def test_without_thread_id(self, mock_get_hex_thread_id):
        mock_get_hex_thread_id.return_value = ''

        server, client1, client2 = self._test()

        if server.error:
            raise server.error

        self.assertIsNotNone(client1.error)
        self.assertTrue(
            client1.error.args[0].startswith('Expected payload to be'),
            'Expected error message to start with "Expected payload to be," but instead got "{}"'.format(
                client1.error.args[0],
            ),
        )
github eventbrite / pysoa / tests / unit / common / transport / redis_gateway / backend / test_standard.py View on Github external
return lua_globals.tonumber(str(pval))

    raise RuntimeError('Invalid Python type: ' + str(type(pval)))


# Patch MockRedis for Python 3 compatibility and error_reply
Script._execute_lua = _execute_lua
Script._lua_to_python = _lua_to_python
Script._python_to_lua = _python_to_lua

#####
# End MockRedis fixes
#####


@mock.patch('redis.Redis', new=mockredis.mock_redis_client)
class TestStandardRedisClient(unittest.TestCase):
    @staticmethod
    def _set_up_client(**kwargs):
        return StandardRedisClient(
            hosts=[('169.254.7.12', 6379), ('169.254.8.12', 6379), ('169.254.9.12', 6379)],
            **kwargs
        )

    def test_invalid_hosts(self):
        with self.assertRaises(ValueError):
            StandardRedisClient(hosts='redis://localhost:1234/0')

    def test_simple_send_and_receive(self):
        client = self._set_up_client()

        payload = {'test': 'test_simple_send_receive'}
github eventbrite / pysoa / tests / unit / server / test_server / test_configuration.py View on Github external
def test_settings_middleware_instantiation(self):
        test_class = mock.MagicMock()
        test_kwargs = {
            'key': 'val',
        }
        self.settings['middleware'].append({
            'object': test_class,
            'kwargs': test_kwargs,
        })
        BaseTestServiceServer(self.settings)
        test_class.assert_called_once_with(**test_kwargs)
github eventbrite / pysoa / tests / unit / common / test_logging.py View on Github external
def test_filter(self):
        record = mock.MagicMock()

        log_filter = PySOALogContextFilter()

        self.assertTrue(log_filter.filter(record))
        self.assertEqual('--', record.correlation_id)
        self.assertEqual('--', record.request_id)
        self.assertEqual('unknown', record.service_name)

        PySOALogContextFilter.set_service_name('foo_qux')
        PySOALogContextFilter.set_logging_request_context(filter='mine', **{'logger': 'yours'})
        self.assertEqual({'filter': 'mine', 'logger': 'yours'}, PySOALogContextFilter.get_logging_request_context())

        record.reset_mock()

        self.assertTrue(log_filter.filter(record))
        self.assertEqual('--', record.correlation_id)
github eventbrite / pysoa / tests / unit / server / test_autoreloader.py View on Github external
with tempfile.NamedTemporaryFile('wb') as tmp_file1, tempfile.NamedTemporaryFile('wb') as tmp_file2, \
                codecs.StreamReaderWriter(tmp_file1, codec.streamreader, codec.streamwriter, 'strict') as file1, \
                codecs.StreamReaderWriter(tmp_file2, codec.streamreader, codec.streamwriter, 'strict') as file2:
            reloader = _PyInotifyReloader('example_service.standalone', ['pysoa'])
            reloader.watching = True

            pool = ThreadPool(processes=1)

            file1.write('test 1')
            file1.flush()

            file2.write('test 2')
            file2.flush()

            # noinspection PyUnresolvedReferences
            with mock.patch.object(target=reloader, attribute='get_watch_file_names') as mock_get_watch_file_names:
                mock_get_watch_file_names.return_value = [file1.name, file2.name]

                result = pool.apply_async(reloader.code_changed)
                self.assertFalse(result.ready())

                time.sleep(0.2)

                self.assertFalse(result.ready())

                file1.write('test changed 1')
                file1.flush()

                time.sleep(0.2)

                self.assertTrue(result.ready())
                self.assertTrue(result.get())
github eventbrite / pysoa / tests / unit / common / transport / redis_gateway / test_server.py View on Github external
from __future__ import (
    absolute_import,
    unicode_literals,
)

import random
import unittest

from pysoa.common.metrics import NoOpMetricsRecorder
from pysoa.common.transport.exceptions import InvalidMessageError
from pysoa.common.transport.redis_gateway.server import RedisServerTransport
from pysoa.test.compatibility import mock


@mock.patch('pysoa.common.transport.redis_gateway.server.RedisTransportServerCore')
class TestServerTransport(unittest.TestCase):
    @staticmethod
    def _get_transport(service='my_service', **kwargs):
        return RedisServerTransport(service, NoOpMetricsRecorder(), **kwargs)

    def test_core_args(self, mock_core):
        transport = self._get_transport(hello='world', goodbye='earth')

        mock_core.assert_called_once_with(
            service_name='my_service',
            hello='world',
            goodbye='earth',
            metrics=transport.metrics,
        )

        mock_core.reset_mock()
github eventbrite / pysoa / tests / unit / server / test_types.py View on Github external
def test_call_local_action_no_action(self):
        server = mock.MagicMock()
        server.action_class_map = {'unused_foo': mock.MagicMock()}

        r = EnrichedActionRequest(action='foo', body={'bar': 'baz'})
        r._server = server

        with pytest.raises(ActionError) as error_context:
            r.call_local_action('other_foo', {'color': 'red'})

        assert error_context.value.errors[0].code == 'UNKNOWN'
        assert error_context.value.errors[0].field == 'action'

        response = r.call_local_action('other_foo', {'color': 'red'}, raise_action_errors=False)
        assert response.action == 'other_foo'
        assert response.errors
        assert response.errors[0].code == 'UNKNOWN'
        assert response.errors[0].field == 'action'
github eventbrite / pysoa / tests / integration / test_send_receive.py View on Github external
def test_call_actions_parallel_with_transport_errors_caught(self):
        original_send = self.client.send_request
        side_effect_context = {'call': 0}
        error = MessageSendError('Hello!')

        def side_effect(*args, **kwargs):
            side_effect_context['call'] += 1
            if side_effect_context['call'] == 2:
                raise error
            return original_send(*args, **kwargs)

        with mock.patch.object(self.client, 'send_request') as mock_send_request:
            mock_send_request.side_effect = side_effect

            action_responses = self.client.call_actions_parallel(
                'error_service',
                [
                    ActionRequest(action='okay_action'),
                    ActionRequest(action='job_error'),
                    ActionRequest(action='okay_action'),
                ],
                timeout=2,
                catch_transport_errors=True,
            )

        self.assertIsNotNone(action_responses)

        action_responses_list = list(action_responses)
github eventbrite / pysoa / tests / unit / common / transport / redis_gateway / backend / test_sentinel.py View on Github external
    @mock.patch('redis.sentinel.Sentinel')
    def test_master_not_found_worked_after_retries(self, mock_sentinel):
        mock_sentinel.return_value.master_for.return_value = MockSentinelRedis()

        client = self._set_up_client(sentinel_failover_retries=2, sentinel_services=['service1', 'service2'])
        client.reset_clients()

        mock_sentinel.return_value.master_for.reset_mock()
        mock_sentinel.return_value.master_for.side_effect = (
            redis.sentinel.MasterNotFoundError,
            redis.sentinel.MasterNotFoundError,
            MockSentinelRedis(),
        )

        connection = client.get_connection('test_master_not_found_worked_after_retries')
        self.assertIsNotNone(connection)