Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def worker(*args, **kwargs):
try:
worker = Worker(*args, **kwargs)
worker.start()
yield worker
finally:
worker.stop()
def process_messages():
worker = dramatiq.Worker(rabbitmq_broker)
worker.start()
rabbitmq_broker.join(rabbitmq_random_queue)
worker.stop()
def setUp(self):
super().setUp()
self.broker = dramatiq.get_broker()
self.worker = dramatiq.Worker(self.broker, worker_timeout=100)
self.worker.start()
def test_rabbitmq_broker_can_enqueue_messages_with_priority(rabbitmq_broker):
max_priority = 10
message_processing_order = []
queue_name = "prioritized"
# Given that I have an actor that store priorities
@dramatiq.actor(queue_name=queue_name)
def do_work(message_priority):
message_processing_order.append(message_priority)
worker = Worker(rabbitmq_broker, worker_threads=1)
worker.queue_prefetch = 1
worker.start()
worker.pause()
try:
# When I send that actor messages with increasing priorities
for priority in range(max_priority):
do_work.send_with_options(args=(priority,), broker_priority=priority)
# And then tell the broker to wait for all messages
worker.resume()
rabbitmq_broker.join(queue_name, timeout=5000)
worker.join()
# I expect the stored priorities to be saved in decreasing order
assert message_processing_order == list(reversed(range(max_priority)))
def process_messages():
worker = dramatiq.Worker(redis_broker)
worker.start()
redis_broker.join(throughput.queue_name)
worker.stop()
def worker(broker):
worker = dramatiq.Worker(broker, worker_timeout=100)
worker.start()
yield worker
worker.stop()
def main(args):
broker = dramatiq.get_broker()
broker.emit_after("process_boot")
worker = dramatiq.Worker(broker, worker_threads=1)
worker.start()
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
break
worker.stop()
broker.close()
def main(args):
broker = dramatiq.get_broker()
broker.emit_after("process_boot")
worker = dramatiq.Worker(broker, worker_threads=1)
worker.start()
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
break
worker.stop()
broker.close()