Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@rpc
def spam(self, ham):
self.publish("message")
self.foo_session.add(FooModel(data=ham))
self.foo_session.commit()
self.foo_session.flush()
return ham + ' & eggs'
def test_runner_catches_container_errors(runner_factory, rabbit_config):
runner = runner_factory(rabbit_config, ExampleService)
runner.start()
container = get_container(runner, ExampleService)
rpc_consumer = get_extension(container, RpcConsumer)
with patch.object(
rpc_consumer, 'handle_result', autospec=True) as handle_result:
exception = Exception("error")
handle_result.side_effect = exception
# use a standalone rpc proxy to call exampleservice.task()
with ServiceRpcProxy("exampleservice", rabbit_config) as proxy:
# proxy.task() will hang forever because it generates an error
# in the remote container (so never receives a response).
proxy.task.call_async()
# verify that the error bubbles up to runner.wait()
with pytest.raises(Exception) as exc_info:
runner.wait()
assert exc_info.value == exception
def test_upstream_blackhole(self, container, publish, toxiproxy):
""" Verify we detect and recover from sockets losing data.
This failure mode means that all data sent from the consumer to the
rabbit broker is lost, but the socket remains open.
Heartbeats sent from the consumer are not received by the broker. After
two beats are missed the broker closes the connection, and subsequent
reads from the socket raise a socket.error, so the connection is
re-established.
"""
queue_consumer = get_extension(container, QueueConsumer)
def reset(args, kwargs, result, exc_info):
toxiproxy.reset_timeout()
return True
with patch_wait(queue_consumer, 'on_connection_error', callback=reset):
toxiproxy.set_timeout(timeout=0)
# connection re-established
msg = "foo"
with entrypoint_waiter(container, 'echo') as result:
publish(msg)
assert result.get() == msg
def test_dependency_call_lifecycle_errors(
container_factory, rabbit_config, method_name):
container = container_factory(ExampleService, rabbit_config)
container.start()
dependency = get_extension(container, EventDispatcher)
with patch.object(dependency, method_name, autospec=True) as method:
err = "error in {}".format(method_name)
method.side_effect = Exception(err)
# use a standalone rpc proxy to call exampleservice.task()
with ServiceRpcProxy("exampleservice", rabbit_config) as proxy:
# proxy.task() will hang forever because it generates an error
proxy.task.call_async()
# verify that the error bubbles up to container.wait()
with pytest.raises(Exception) as exc_info:
container.wait()
assert str(exc_info.value) == err
disconnected.wait()
return arg
return "duplicate-call-result"
@event_handler('srcservice', 'exampleevent')
def handle(self, evt_data):
handle_called(evt_data)
if not disconnect_now.ready():
disconnect_now.send(True)
disconnected.wait()
class ProxyService(object):
name = "proxyservice"
example_rpc = RpcProxy('exampleservice')
@dummy
def entrypoint(self, arg):
return self.example_rpc.method(arg)
@dummy
def retry(self, arg):
results = []
while True:
try:
results.append(self.example_rpc.method(arg))
return results
except Exception as ex:
results.append((type(ex), str(ex)))
""" Verify we automatically recover from stale connections.
Publish confirms are required for this functionality. Without confirms
the later messages are silently lost and the test hangs waiting for a
response.
"""
assert service_rpc.echo(1) == 1
toxiproxy.disable()
def enable_after_retry(args, kwargs, res, exc_info):
toxiproxy.enable()
return True
# subsequent calls succeed (after reconnecting via retry policy)
with patch_wait(Connection, 'connect', callback=enable_after_retry):
assert service_rpc.echo(2) == 2
def test_down(self, container, publish, toxiproxy):
""" Verify we detect and recover from closed sockets.
This failure mode closes the socket between the consumer and the
rabbit broker.
Attempting to read from the closed socket raises a socket.error
and the connection is re-established.
"""
queue_consumer = get_extension(container, QueueConsumer)
def reset(args, kwargs, result, exc_info):
toxiproxy.enable()
return True
with patch_wait(queue_consumer, 'on_connection_error', callback=reset):
toxiproxy.disable()
# connection re-established
msg = "foo"
with entrypoint_waiter(container, 'echo') as result:
publish(msg)
assert result.get() == msg
with entrypoint_hook(publisher_container, 'send') as send:
send(payload1)
assert tracker.call_args_list == [
call("send", payload1),
call("recv", payload1),
]
toxiproxy.disable()
def enable_after_retry(args, kwargs, res, exc_info):
toxiproxy.enable()
return True
# call 2 succeeds (after reconnecting via retry policy)
with patch_wait(Connection, 'connect', callback=enable_after_retry):
payload2 = "payload2"
with entrypoint_waiter(consumer_container, 'recv'):
with entrypoint_hook(publisher_container, 'send') as send:
send(payload2)
assert tracker.call_args_list == [
call("send", payload1),
call("recv", payload1),
call("send", payload2),
call("recv", payload2),
]
""" Verify we automatically recover from stale connections.
Publish confirms are required for this functionality. Without confirms
the later messages are silently lost and the test hangs waiting for a
response.
"""
assert service_rpc.echo(1) == 1
toxiproxy.disable()
def enable_after_retry(args, kwargs, res, exc_info):
toxiproxy.enable()
return True
# call 2 succeeds (after reconnecting via retry policy)
with patch_wait(Connection, 'connect', callback=enable_after_retry):
assert service_rpc.echo(2) == 2
Attempting to read from the socket after it's closed raises a
socket.error and the connection will be re-established. If `timeout`
is longer than twice the heartbeat interval, the behaviour is the same
as in `test_downstream_blackhole` below, except that the consumer
cancel will eventually (`timeout` milliseconds) raise a socket.error,
which is ignored, allowing the teardown to continue.
See :meth:`kombu.messsaging.Consumer.__exit__`
"""
queue_consumer = get_extension(container, QueueConsumer)
def reset(args, kwargs, result, exc_info):
toxiproxy.reset_timeout()
return True
with patch_wait(queue_consumer, 'on_connection_error', callback=reset):
toxiproxy.set_timeout(stream="downstream", timeout=100)
# connection re-established
assert service_rpc.echo("foo") == "foo"