Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@mock.patch(target='pysoa.common.transport.redis_gateway.client.get_hex_thread_id')
def test_without_thread_id(self, mock_get_hex_thread_id):
mock_get_hex_thread_id.return_value = ''
server, client1, client2 = self._test()
if server.error:
raise server.error
self.assertIsNotNone(client1.error)
self.assertTrue(
client1.error.args[0].startswith('Expected payload to be'),
'Expected error message to start with "Expected payload to be," but instead got "{}"'.format(
client1.error.args[0],
),
)
return lua_globals.tonumber(str(pval))
raise RuntimeError('Invalid Python type: ' + str(type(pval)))
# Patch MockRedis for Python 3 compatibility and error_reply
Script._execute_lua = _execute_lua
Script._lua_to_python = _lua_to_python
Script._python_to_lua = _python_to_lua
#####
# End MockRedis fixes
#####
@mock.patch('redis.Redis', new=mockredis.mock_redis_client)
class TestStandardRedisClient(unittest.TestCase):
@staticmethod
def _set_up_client(**kwargs):
return StandardRedisClient(
hosts=[('169.254.7.12', 6379), ('169.254.8.12', 6379), ('169.254.9.12', 6379)],
**kwargs
)
def test_invalid_hosts(self):
with self.assertRaises(ValueError):
StandardRedisClient(hosts='redis://localhost:1234/0')
def test_simple_send_and_receive(self):
client = self._set_up_client()
payload = {'test': 'test_simple_send_receive'}
def test_settings_middleware_instantiation(self):
test_class = mock.MagicMock()
test_kwargs = {
'key': 'val',
}
self.settings['middleware'].append({
'object': test_class,
'kwargs': test_kwargs,
})
BaseTestServiceServer(self.settings)
test_class.assert_called_once_with(**test_kwargs)
def test_filter(self):
record = mock.MagicMock()
log_filter = PySOALogContextFilter()
self.assertTrue(log_filter.filter(record))
self.assertEqual('--', record.correlation_id)
self.assertEqual('--', record.request_id)
self.assertEqual('unknown', record.service_name)
PySOALogContextFilter.set_service_name('foo_qux')
PySOALogContextFilter.set_logging_request_context(filter='mine', **{'logger': 'yours'})
self.assertEqual({'filter': 'mine', 'logger': 'yours'}, PySOALogContextFilter.get_logging_request_context())
record.reset_mock()
self.assertTrue(log_filter.filter(record))
self.assertEqual('--', record.correlation_id)
with tempfile.NamedTemporaryFile('wb') as tmp_file1, tempfile.NamedTemporaryFile('wb') as tmp_file2, \
codecs.StreamReaderWriter(tmp_file1, codec.streamreader, codec.streamwriter, 'strict') as file1, \
codecs.StreamReaderWriter(tmp_file2, codec.streamreader, codec.streamwriter, 'strict') as file2:
reloader = _PyInotifyReloader('example_service.standalone', ['pysoa'])
reloader.watching = True
pool = ThreadPool(processes=1)
file1.write('test 1')
file1.flush()
file2.write('test 2')
file2.flush()
# noinspection PyUnresolvedReferences
with mock.patch.object(target=reloader, attribute='get_watch_file_names') as mock_get_watch_file_names:
mock_get_watch_file_names.return_value = [file1.name, file2.name]
result = pool.apply_async(reloader.code_changed)
self.assertFalse(result.ready())
time.sleep(0.2)
self.assertFalse(result.ready())
file1.write('test changed 1')
file1.flush()
time.sleep(0.2)
self.assertTrue(result.ready())
self.assertTrue(result.get())
from __future__ import (
absolute_import,
unicode_literals,
)
import random
import unittest
from pysoa.common.metrics import NoOpMetricsRecorder
from pysoa.common.transport.exceptions import InvalidMessageError
from pysoa.common.transport.redis_gateway.server import RedisServerTransport
from pysoa.test.compatibility import mock
@mock.patch('pysoa.common.transport.redis_gateway.server.RedisTransportServerCore')
class TestServerTransport(unittest.TestCase):
@staticmethod
def _get_transport(service='my_service', **kwargs):
return RedisServerTransport(service, NoOpMetricsRecorder(), **kwargs)
def test_core_args(self, mock_core):
transport = self._get_transport(hello='world', goodbye='earth')
mock_core.assert_called_once_with(
service_name='my_service',
hello='world',
goodbye='earth',
metrics=transport.metrics,
)
mock_core.reset_mock()
def test_call_local_action_no_action(self):
server = mock.MagicMock()
server.action_class_map = {'unused_foo': mock.MagicMock()}
r = EnrichedActionRequest(action='foo', body={'bar': 'baz'})
r._server = server
with pytest.raises(ActionError) as error_context:
r.call_local_action('other_foo', {'color': 'red'})
assert error_context.value.errors[0].code == 'UNKNOWN'
assert error_context.value.errors[0].field == 'action'
response = r.call_local_action('other_foo', {'color': 'red'}, raise_action_errors=False)
assert response.action == 'other_foo'
assert response.errors
assert response.errors[0].code == 'UNKNOWN'
assert response.errors[0].field == 'action'
def test_call_actions_parallel_with_transport_errors_caught(self):
original_send = self.client.send_request
side_effect_context = {'call': 0}
error = MessageSendError('Hello!')
def side_effect(*args, **kwargs):
side_effect_context['call'] += 1
if side_effect_context['call'] == 2:
raise error
return original_send(*args, **kwargs)
with mock.patch.object(self.client, 'send_request') as mock_send_request:
mock_send_request.side_effect = side_effect
action_responses = self.client.call_actions_parallel(
'error_service',
[
ActionRequest(action='okay_action'),
ActionRequest(action='job_error'),
ActionRequest(action='okay_action'),
],
timeout=2,
catch_transport_errors=True,
)
self.assertIsNotNone(action_responses)
action_responses_list = list(action_responses)
@mock.patch('redis.sentinel.Sentinel')
def test_master_not_found_worked_after_retries(self, mock_sentinel):
mock_sentinel.return_value.master_for.return_value = MockSentinelRedis()
client = self._set_up_client(sentinel_failover_retries=2, sentinel_services=['service1', 'service2'])
client.reset_clients()
mock_sentinel.return_value.master_for.reset_mock()
mock_sentinel.return_value.master_for.side_effect = (
redis.sentinel.MasterNotFoundError,
redis.sentinel.MasterNotFoundError,
MockSentinelRedis(),
)
connection = client.get_connection('test_master_not_found_worked_after_retries')
self.assertIsNotNone(connection)