Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_runner_catches_container_errors(runner_factory, rabbit_config):
runner = runner_factory(rabbit_config, ExampleService)
runner.start()
container = get_container(runner, ExampleService)
rpc_consumer = get_extension(container, RpcConsumer)
with patch.object(
rpc_consumer, 'handle_result', autospec=True) as handle_result:
exception = Exception("error")
handle_result.side_effect = exception
# use a standalone rpc proxy to call exampleservice.task()
with ServiceRpcProxy("exampleservice", rabbit_config) as proxy:
# proxy.task() will hang forever because it generates an error
# in the remote container (so never receives a response).
proxy.task.call_async()
# verify that the error bubbles up to runner.wait()
with pytest.raises(Exception) as exc_info:
runner.wait()
assert exc_info.value == exception
def test_events(self, container_factory, rabbit_config):
from events import ServiceA, ServiceB
container_a = container_factory(ServiceA, rabbit_config)
container_b = container_factory(ServiceB, rabbit_config)
container_a.start()
container_b.start()
with ServiceRpcProxy('service_a', rabbit_config) as service_a_rpc:
with patch.object(ServiceB, 'handle_event') as handle_event:
with entrypoint_waiter(container_b, 'handle_event'):
service_a_rpc.dispatching_method()
assert handle_event.call_args_list == [call("payload")]
def method(self):
self.service_y.method()
class ServiceY(object):
name = "y"
capture = CaptureWorkerContext()
@rpc
def method(self):
pass
runner = runner_factory(rabbit_config, ServiceX, ServiceY)
runner.start()
with ServiceRpcProxy("x", rabbit_config) as service_x:
service_x.method()
call_ids = [worker_ctx.call_id for worker_ctx in worker_contexts]
assert call_ids == ["x.method.1", "y.method.2"]
def test_proxy_remote_error(container_factory, rabbit_config):
container = container_factory(FooService, rabbit_config)
container.start()
with ServiceRpcProxy("foobar", rabbit_config) as proxy:
with pytest.raises(RemoteError) as exc_info:
proxy.broken()
assert exc_info.value.exc_type == "ExampleError"
def test_timeout_not_needed(container_factory, rabbit_manager, rabbit_config):
container = container_factory(FooService, rabbit_config)
container.start()
with ServiceRpcProxy('foobar', rabbit_config, timeout=1) as proxy:
assert proxy.sleep() == 0
container1 = list(runner1.containers)[0]
container2 = list(runner2.containers)[0]
with entrypoint_waiter(container1, "handle"):
with entrypoint_waiter(container2, "handle"):
dispatch('srcservice', "testevent", event_data)
assert tracker.call_args_list == [call(event_data), call(event_data)]
# verify there are two consumers on the rpc queue
rpc_queue = rabbit_manager.get_queue(vhost, 'rpc-service')
assert rpc_queue['consumers'] == 2
# test rpc (only one service will respond)
arg = "arg"
with ServiceRpcProxy('service', rabbit_config) as proxy:
proxy.handle(arg)
assert tracker.call_args_list == [
call(event_data), call(event_data), call(arg)
]
def test_proxy_context_data(container_factory, rabbit_config):
container = container_factory(FooService, rabbit_config)
container.start()
context_data = {'language': 'en'}
with ServiceRpcProxy('foobar', rabbit_config, context_data) as foo:
assert foo.get_context_data('language') == 'en'
context_data = {'language': 'fr'}
with ServiceRpcProxy('foobar', rabbit_config, context_data) as foo:
assert foo.get_context_data('language') == 'fr'
def test_hello_world(self, container_factory, rabbit_config):
from helloworld import GreetingService
container = container_factory(GreetingService, rabbit_config)
container.start()
with ServiceRpcProxy('greeting_service', rabbit_config) as greet_rpc:
assert greet_rpc.hello("Matt") == "Hello, Matt!"
def test_proxy_manual_start_stop(container_factory, rabbit_config):
container = container_factory(FooService, rabbit_config)
container.start()
foobar_proxy = ServiceRpcProxy('foobar', rabbit_config)
foo = foobar_proxy.start()
assert foo.spam(ham='eggs') == 'eggs'
assert foo.spam(ham='eggs') == 'eggs' # test re-use
foobar_proxy.stop()
def __init__(self, service_name, *args, **kwargs):
super(ServiceRpcProxy, self).__init__(*args, **kwargs)
self._proxy = ServiceProxy(
self._worker_ctx, service_name, self._reply_listener)