Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_lifecycle(rabbit_manager, rabbit_config, mock_container):
container = mock_container
container.shared_extensions = {}
container.config = rabbit_config
container.max_workers = 3
container.spawn_managed_thread.side_effect = spawn_managed_thread
content_type = 'application/data'
container.accept = [content_type]
queue_consumer = QueueConsumer().bind(container)
handler = MessageHandler()
queue_consumer.register_provider(handler)
queue_consumer.setup()
queue_consumer.start()
# making sure the QueueConsumer uses the container to spawn threads
container.spawn_managed_thread.assert_called_once_with(ANY)
vhost = rabbit_config['vhost']
rabbit_manager.publish(vhost, 'spam', '', 'shrub',
properties=dict(content_type=content_type))
message = handler.wait()
def test_error_stops_consumer_thread(mock_container):
container = mock_container
container.shared_extensions = {}
container.config = {AMQP_URI_CONFIG_KEY: 'memory://'}
container.max_workers = 3
container.spawn_managed_thread = spawn_managed_thread
queue_consumer = QueueConsumer().bind(container)
queue_consumer.setup()
handler = MessageHandler()
queue_consumer.register_provider(handler)
with eventlet.Timeout(TIMEOUT):
with patch.object(
Connection, 'drain_events', autospec=True) as drain_events:
drain_events.side_effect = Exception('test')
queue_consumer.start()
with pytest.raises(Exception) as exc_info:
queue_consumer._gt.wait()
assert exc_info.value.args == ('test',)
def test_upstream_timeout(self, container, toxiproxy):
""" Verify we detect and recover from sockets timing out.
This failure mode means that the socket between the consumer and the
rabbit broker times for out `timeout` milliseconds and then closes.
Attempting to read from the socket after it's closed raises a
socket.error and the connection will be re-established. If `timeout`
is longer than twice the heartbeat interval, the behaviour is the same
as in `test_upstream_blackhole` below.
"""
queue_consumer = get_extension(container, QueueConsumer)
def reset(args, kwargs, result, exc_info):
toxiproxy.reset_timeout()
return True
with patch_wait(queue_consumer, 'on_connection_error', callback=reset):
toxiproxy.set_timeout(timeout=100)
# connection re-established
msg = "foo"
with entrypoint_hook(container, 'echo') as echo:
assert echo(msg) == msg
def queue_consumer():
replacement = create_autospec(QueueConsumer)
with patch.object(QueueConsumer, 'bind') as mock_ext:
mock_ext.return_value = replacement
yield replacement
def toxic_queue_consumer(self, toxiproxy):
with patch.object(QueueConsumer, 'amqp_uri', new=toxiproxy.uri):
yield
class Service(object):
name = "service"
@rpc
def foo(self):
pass # pragma: no cover
@rpc
def bar(self):
pass # pragma: no cover
container = ServiceContainer(Service, rabbit_config)
rpc_consumer = get_extension(container, RpcConsumer)
queue_consumer = get_extension(container, QueueConsumer)
foo_rpc = get_extension(container, Rpc, method_name="foo")
bar_rpc = get_extension(container, Rpc, method_name="bar")
extensions = container.extensions
assert extensions == set([rpc_consumer, queue_consumer, foo_rpc, bar_rpc])
def fast_reconnects(self):
@contextmanager
def establish_connection(self):
with self.create_connection() as conn:
conn.ensure_connection(
self.on_connection_error,
self.connect_max_retries,
interval_start=0.1,
interval_step=0.1)
yield conn
with patch.object(
QueueConsumer, 'establish_connection', new=establish_connection
):
yield
def test_stop_while_starting(rabbit_config, mock_container):
started = Event()
container = mock_container
container.shared_extensions = {}
container.config = rabbit_config
container.max_workers = 3
container.spawn_managed_thread = spawn_managed_thread
class BrokenConnConsumer(QueueConsumer):
def consume(self, *args, **kwargs):
started.send(None)
# kombu will retry again and again on broken connections
# so we have to make sure the event is reset to allow consume
# to be called again
started.reset()
return super(BrokenConnConsumer, self).consume(*args, **kwargs)
queue_consumer = BrokenConnConsumer().bind(container)
queue_consumer.setup()
handler = MessageHandler()
queue_consumer.register_provider(handler)
with eventlet.Timeout(TIMEOUT):
with patch.object(Connection, 'connect', autospec=True) as connect:
"Retrying in {} seconds."
.format(sanitize_url(self.amqp_uri), exc, interval))
def on_consume_ready(self, connection, channel, consumers, **kwargs):
""" Kombu callback when consumers are ready to accept messages.
Called after any (re)connection to the broker.
"""
if not self._consumers_ready.ready():
_log.debug('consumer started %s', self)
self._consumers_ready.send(None)
class Consumer(Entrypoint, HeaderDecoder):
queue_consumer = QueueConsumer()
def __init__(self, queue, requeue_on_error=False, **kwargs):
"""
Decorates a method as a message consumer.
Messages from the queue will be deserialized depending on their content
type and passed to the the decorated method.
When the consumer method returns without raising any exceptions,
the message will automatically be acknowledged.
If any exceptions are raised during the consumption and
`requeue_on_error` is True, the message will be requeued.
If `requeue_on_error` is true, handlers will return the event to the
queue if an error occurs while handling it. Defaults to false.
Example::