Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import random
import time
import pytest
import dramatiq
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from ..common import RABBITMQ_CREDENTIALS
broker = RabbitmqBroker(
host="127.0.0.1",
credentials=RABBITMQ_CREDENTIALS,
)
@dramatiq.actor(queue_name="benchmark-throughput", broker=broker)
def throughput():
pass
@dramatiq.actor(queue_name="benchmark-fib", broker=broker)
def fib(n):
x, y = 1, 1
while n > 2:
x, y = x + y, x
n -= 1
def test_rabbitmq_broker_raises_an_error_if_given_invalid_parameter_combinations():
# Given that I have a RabbitmqBroker
# When I try to give it both a connection URL and a list of connection parameters
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(url="amqp://127.0.0.1:5672", parameters=[dict(host="127.0.0.1", credentials=RABBITMQ_CREDENTIALS)])
# When I try to give it both a connection URL and pika connection parameters
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(host="127.0.0.1", url="amqp://127.0.0.1:5672")
# When I try to give it both a list of parameters and individual flags
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(host="127.0.0.1", parameters=[dict(host="127.0.0.1")])
def test_rabbitmq_broker_raises_an_error_if_given_invalid_parameter_combinations():
# Given that I have a RabbitmqBroker
# When I try to give it both a connection URL and a list of connection parameters
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(url="amqp://127.0.0.1:5672", parameters=[dict(host="127.0.0.1", credentials=RABBITMQ_CREDENTIALS)])
# When I try to give it both a connection URL and pika connection parameters
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(host="127.0.0.1", url="amqp://127.0.0.1:5672")
# When I try to give it both a list of parameters and individual flags
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(host="127.0.0.1", parameters=[dict(host="127.0.0.1")])
def test_rabbitmq_broker_can_be_passed_a_list_of_uri_for_failover():
# Given a string with a list of RabbitMQ connection URIs, including an invalid one
# When I pass those URIs to RabbitMQ broker as a list
broker = RabbitmqBroker(
url=["amqp://127.0.0.1:55672", "amqp://%s:%s@127.0.0.1" % (RABBITMQ_USERNAME, RABBITMQ_PASSWORD)])
# The the broker should connect to the host that is up
assert broker.connection
def test_rabbitmq_broker_can_be_passed_a_semicolon_separated_list_of_uris():
# Given a string with a list of RabbitMQ connection URIs, including an invalid one
# When I pass those URIs to RabbitMQ broker as a ;-separated string
broker = RabbitmqBroker(
url="amqp://127.0.0.1:55672;amqp://%s:%s@127.0.0.1" % (RABBITMQ_USERNAME, RABBITMQ_PASSWORD))
# The the broker should connect to the host that is up
assert broker.connection
def test_rabbitmq_broker_raises_an_error_if_given_invalid_parameter_combinations():
# Given that I have a RabbitmqBroker
# When I try to give it both a connection URL and a list of connection parameters
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(url="amqp://127.0.0.1:5672", parameters=[dict(host="127.0.0.1", credentials=RABBITMQ_CREDENTIALS)])
# When I try to give it both a connection URL and pika connection parameters
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(host="127.0.0.1", url="amqp://127.0.0.1:5672")
# When I try to give it both a list of parameters and individual flags
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(host="127.0.0.1", parameters=[dict(host="127.0.0.1")])
def test_urlrabbitmq_creates_instances_of_rabbitmq_broker():
# Given a URL connection string
url = "amqp://%s:%s@127.0.0.1:5672" % (RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
# When I pass that to URLRabbitmqBroker
broker = URLRabbitmqBroker(url)
# Then I should get back a RabbitmqBroker
assert isinstance(broker, RabbitmqBroker)
from dramatiq.brokers.redis import RedisBroker
from dramatiq.brokers.rabbitmq import RabbitmqBroker
logger = logging.getLogger("example")
counter_key = "latench-bench-counter"
memcache_client = pylibmc.Client(["localhost"], binary=True)
memcache_pool = pylibmc.ClientPool(memcache_client, 8)
random.seed(1337)
if os.getenv("REDIS") == "1":
broker = RedisBroker()
dramatiq.set_broker(broker)
celery_app = celery.Celery(broker="redis:///")
else:
broker = RabbitmqBroker()
dramatiq.set_broker(broker)
celery_app = celery.Celery(broker="amqp:///")
def fib_bench(n):
p, q = 0, 1
while n > 0:
p, q = q, p + q
n -= 1
with memcache_pool.reserve() as client:
client.incr(counter_key)
return p
kwargs = [("host", host), ("port", port)]
kwargs = {k: v for k, v in kwargs if v is not None}
# Initializes broker
if kind == "stub":
from dramatiq.brokers.stub import StubBroker
broker = StubBroker()
elif kind == "redis":
from dramatiq.brokers.redis import RedisBroker
broker = RedisBroker(**kwargs)
elif kind == "rabbitmq":
from dramatiq.brokers.rabbitmq import RabbitmqBroker
broker = RabbitmqBroker(**kwargs)
else:
raise ValueError(f"invalid dramatiq broker: {kind}")
# Configure as default and exit
dramatiq.set_broker(broker)
return broker
def URLRabbitmqBroker(url, *, middleware=None):
"""Alias for the RabbitMQ broker that takes a connection URL as a
positional argument.
Parameters:
url(str): A connection string.
middleware(list[Middleware]): The middleware to add to this
broker.
"""
warnings.warn(
"Use RabbitmqBroker with the 'url' parameter instead of URLRabbitmqBroker.",
DeprecationWarning, stacklevel=2,
)
return RabbitmqBroker(url=url, middleware=middleware)