Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_rabbitmq_broker_stops_retrying_declaring_queues_when_max_attempts_reached(rabbitmq_broker):
# Given that I have a rabbit instance that lost its connection
with patch.object(rabbitmq_broker, "_declare_queue", side_effect=pika.exceptions.AMQPConnectionError):
# When I declare an actor
# Then a ConnectionClosed error should be raised
with pytest.raises(dramatiq.errors.ConnectionClosed):
@dramatiq.actor(queue_name="flaky_queue")
def do_work():
pass
body=message.encode(),
properties=properties,
)
self.emit_after("enqueue", message, delay)
return message
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
# Delete the channel and the connection so that the
# next caller/attempt may initiate new ones of each.
del self.channel
del self.connection
attempts += 1
if attempts > MAX_ENQUEUE_ATTEMPTS:
raise ConnectionClosed(e) from None
self.logger.debug(
"Retrying enqueue due to closed connection. [%d/%d]",
attempts, MAX_ENQUEUE_ATTEMPTS,
)
def nack(self, message):
try:
# Same deal as above.
self.broker.do_nack(self.queue_name, message.options["redis_message_id"])
except redis.ConnectionError as e:
raise ConnectionClosed(e) from None
finally:
self.message_refc -= 1
def __init__(self, parameters, queue_name, prefetch, timeout):
try:
self.logger = get_logger(__name__, type(self))
self.connection = pika.BlockingConnection(parameters=parameters)
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=prefetch)
self.iterator = self.channel.consume(queue_name, inactivity_timeout=timeout / 1000)
# We need to keep track of known delivery tags so that
# when connection errors occur and the consumer is reset,
# we don't attempt to send invalid tags to Rabbit since
# pika doesn't handle this very well.
self.known_tags = set()
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
raise ConnectionClosed(e) from None
self.queue_name,
self.prefetch - self.message_refc,
)
# Because we didn't get any messages, we should
# progressively long poll up to the idle timeout.
if not messages:
self.misses, backoff_ms = compute_backoff(self.misses, max_backoff=self.timeout)
time.sleep(backoff_ms / 1000)
return None
# Since we received some number of messages, we
# have to keep track of them.
self.message_refc += len(messages)
except redis.ConnectionError as e:
raise ConnectionClosed(e) from None
while not all_callbacks_handled.is_set():
self.connection.sleep(0)
except Exception:
self.logger.exception(
"Failed to wait for all callbacks to complete. This "
"can happen when the RabbitMQ server is suddenly "
"restarted."
)
try:
self.channel.close()
self.connection.close()
except (AssertionError,
pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
raise ConnectionClosed(e) from None
self._declare_dq_queue(queue_name)
self.delay_queues.add(delayed_name)
self.emit_after("declare_delay_queue", delayed_name)
self._declare_xq_queue(queue_name)
break
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e: # pragma: no cover
# Delete the channel and the connection so that the next
# caller may initiate new ones of each.
del self.channel
del self.connection
attempts += 1
if attempts > MAX_DECLARE_ATTEMPTS:
raise ConnectionClosed(e) from None
self.logger.debug(
"Retrying declare due to closed connection. [%d/%d]",
attempts, MAX_DECLARE_ATTEMPTS,
)
def __next__(self):
try:
method, properties, body = next(self.iterator)
if method is None:
return None
message = Message.decode(body)
self.known_tags.add(method.delivery_tag)
return _RabbitmqMessage(method.delivery_tag, message)
except (AssertionError,
pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
raise ConnectionClosed(e) from None
def ack(self, message):
try:
self.known_tags.remove(message._tag)
self.connection.add_callback_threadsafe(
partial(self.channel.basic_ack, message._tag),
)
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
raise ConnectionClosed(e) from None
except KeyError:
self.logger.warning("Failed to ack message: not in known tags.")
except Exception: # pragma: no cover
self.logger.warning("Failed to ack message.", exc_info=True)
def nack(self, message):
try:
self.known_tags.remove(message._tag)
self.connection.add_callback_threadsafe(
partial(self.channel.basic_nack, message._tag, requeue=False),
)
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
raise ConnectionClosed(e) from None
except KeyError:
self.logger.warning("Failed to nack message: not in known tags.")
except Exception: # pragma: no cover
self.logger.warning("Failed to nack message.", exc_info=True)