Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import pytest
import dramatiq
import dramatiq.broker
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dramatiq.middleware import Middleware
from .common import RABBITMQ_CREDENTIALS, skip_on_windows
class EmptyMiddleware(Middleware):
pass
def test_broker_uses_rabbitmq_if_not_set():
# Given that no global broker is set
dramatiq.broker.global_broker = None
# If I try to get the global broker
broker = dramatiq.get_broker()
# I expect it to be a RabbitmqBroker instance
assert isinstance(broker, RabbitmqBroker)
@skip_on_windows
def test_broker_middleware_can_be_added_before_other_middleware(stub_broker):
def test_pipeline_does_not_continue_to_next_actor_when_message_is_marked_as_failed(stub_broker, stub_worker):
# Given that I have an actor that fails messages
class FailMessageMiddleware(middleware.Middleware):
def after_process_message(self, broker, message, *, result=None, exception=None):
message.fail()
stub_broker.add_middleware(FailMessageMiddleware())
has_run = False
@dramatiq.actor
def do_nothing():
pass
@dramatiq.actor
def should_never_run():
nonlocal has_run
has_run = True
else:
middleware = list(middleware)
if integration is not None:
assert SentryMiddleware not in (m.__class__ for m in middleware),\
"Sentry middleware must not be passed in manually to broker"
middleware.insert(0, SentryMiddleware())
kw['middleware'] = middleware
# raise Exception([args, kw])
original_broker__init__(self, *args, **kw)
Broker.__init__ = sentry_patched_broker__init__
class SentryMiddleware(Middleware):
"""A Dramatiq middleware that automatically captures and sends
exceptions to Sentry.
This is automatically added to every instantiated broker via the
DramatiqIntegration.
"""
def before_process_message(self, broker, message):
hub = Hub.current
integration = hub.get_integration(DramatiqIntegration)
if integration is None:
return
message._scope_manager = hub.push_scope()
message._scope_manager.__enter__()
if exception is not None:
status = Task.STATUS_FAILED
elif status is None:
status = Task.STATUS_DONE
LOGGER.debug("Updating Task from message %r.", message.message_id)
Task.tasks.create_or_update_from_message(
message,
status=status,
actor_name=message.actor_name,
queue_name=message.queue_name,
)
class DbConnectionsMiddleware(Middleware):
"""This middleware cleans up db connections on worker shutdown.
"""
def _close_old_connections(self, *args, **kwargs):
db.close_old_connections()
before_process_message = _close_old_connections
after_process_message = _close_old_connections
def _close_connections(self, *args, **kwargs):
db.connections.close_all()
before_consumer_thread_shutdown = _close_connections
before_worker_thread_shutdown = _close_connections
before_worker_shutdown = _close_connections
worker_timeout=self.worker_timeout,
)
consumer.start()
def _add_worker(self):
worker = _WorkerThread(
broker=self.broker,
consumers=self.consumers,
work_queue=self.work_queue,
worker_timeout=self.worker_timeout
)
worker.start()
self.workers.append(worker)
class _WorkerMiddleware(Middleware):
def __init__(self, worker):
self.logger = get_logger(__name__, type(self))
self.worker = worker
def after_declare_queue(self, broker, queue_name):
self.logger.debug("Adding consumer for queue %r.", queue_name)
self.worker._add_consumer(queue_name)
def after_declare_delay_queue(self, broker, queue_name):
self.logger.debug("Adding consumer for delay queue %r.", queue_name)
self.worker._add_consumer(queue_name, delay=True)
class _ConsumerThread(Thread):
def __init__(self, *, broker, queue_name, prefetch, work_queue, worker_timeout):
super().__init__(daemon=True)
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
from ..logging import get_logger
from ..middleware import Middleware
#: The maximum amount of milliseconds results are allowed to exist in
#: the backend.
DEFAULT_RESULT_TTL = 600000
class Results(Middleware):
"""Middleware that automatically stores actor results.
Example:
>>> from dramatiq.results import Results
>>> from dramatiq.results.backends import RedisBackend
>>> backend = RedisBackend()
>>> broker.add_middleware(Results(backend=backend))
>>> @dramatiq.actor(store_results=True)
... def add(x, y):
... return x + y
>>> message = add.send(1, 2)
>>> message.get_result(backend=backend)
3
import logging
from django import db
from dramatiq.middleware import Middleware
LOGGER = logging.getLogger("django_dramatiq.AdminMiddleware")
class AdminMiddleware(Middleware):
"""This middleware keeps track of task executions.
"""
def after_enqueue(self, broker, message, delay):
from .models import Task
LOGGER.debug("Creating Task from message %r.", message.message_id)
status = Task.STATUS_ENQUEUED
if delay:
status = Task.STATUS_DELAYED
Task.tasks.create_or_update_from_message(
message,
status=status,
actor_name=message.actor_name,
queue_name=message.queue_name,