Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pipelines_expose_completion_stats(stub_broker, stub_worker, result_backend):
# Given a result backend
# And a broker with the results middleware
stub_broker.add_middleware(Results(backend=result_backend))
# And an actor that waits some amount of time
condition = Condition()
@dramatiq.actor(store_results=True)
def wait(n):
time.sleep(n)
with condition:
condition.notify_all()
return n
# When I pipe some messages intended for that actor together and run the pipeline
pipe = wait.message(1) | wait.message()
pipe.run()
# Then every time a job in the pipeline completes, the completed_count should increase
def test_groups_execute_jobs_in_parallel(stub_broker, stub_worker, result_backend):
# Given that I have a result backend
stub_broker.add_middleware(Results(backend=result_backend))
# And I have an actor that sleeps for 100ms
@dramatiq.actor(store_results=True)
def wait():
time.sleep(0.1)
# When I group multiple of these actors together and run them
t = time.monotonic()
g = group([wait.message() for _ in range(5)])
g.run()
# And wait on the group to complete
results = list(g.get_results(block=True))
# Then the total elapsed time should be less than 500ms
assert time.monotonic() - t <= 0.5
def test_pipeline_results_respect_timeouts(stub_broker, stub_worker, result_backend):
# Given a result backend
# And a broker with the results middleware
stub_broker.add_middleware(Results(backend=result_backend))
# And an actor that waits some amount of time then doubles that amount
@dramatiq.actor(store_results=True)
def wait(n):
time.sleep(n)
return n * 2
# When I pipe some messages intended for that actor together and run the pipeline
pipe = wait.message(1) | wait.message() | wait.message()
pipe.run()
# And get the results with a lower timeout than the tasks can complete in
# Then a ResultTimeout error should be raised
with pytest.raises(ResultTimeout):
for _ in pipe.get_results(block=True, timeout=1000):
pass
def test_groups_can_time_out(stub_broker, stub_worker, result_backend):
# Given that I have a result backend
stub_broker.add_middleware(Results(backend=result_backend))
# And I have an actor that sleeps for 300ms
@dramatiq.actor(store_results=True)
def wait():
time.sleep(0.3)
# When I group a few jobs together and run it
g = group(wait.message() for _ in range(2))
g.run()
# And wait for the group to complete with a timeout
# Then a ResultTimeout error should be raised
with pytest.raises(ResultTimeout):
g.wait(timeout=100)
# And the group should not be completed
def test_groups_expose_completion_stats(stub_broker, stub_worker, result_backend):
# Given that I have a result backend
stub_broker.add_middleware(Results(backend=result_backend))
# And an actor that waits some amount of time
condition = Condition()
@dramatiq.actor(store_results=True)
def wait(n):
time.sleep(n)
with condition:
condition.notify_all()
return n
# When I group messages of varying durations together and run the group
g = group(wait.message(n) for n in range(1, 4))
g.run()
# Then every time a job in the group completes, the completed_count should increase
def test_actors_can_store_results(stub_broker, stub_worker, result_backend):
# Given a result backend
# And a broker with the results middleware
stub_broker.add_middleware(Results(backend=result_backend))
# And an actor that stores results
@dramatiq.actor(store_results=True)
def do_work():
return 42
# When I send that actor a message
message = do_work.send()
# And wait for a result
result = result_backend.get_result(message, block=True)
# Then the result should be what the actor returned
assert result == 42
def test_pipe_ignore_applies_to_receiving_message(stub_broker, stub_worker, result_backend):
# Given a result backend
# And a broker with the results middleware
stub_broker.add_middleware(Results(backend=result_backend))
@dramatiq.actor(store_results=True)
def return_args(*args):
return args
# When I compose pipe of three messages with pipe_ignore option on second message
pipe = (
return_args.message(1) |
return_args.message_with_options(pipe_ignore=True, args=(2, )) |
return_args.message(3)
)
# And then run and wait for it to complete
pipe.run()
stub_broker.join(return_args.queue_name)
results = list(pipe.get_results())
def test_pipeline_results_can_be_retrieved(stub_broker, stub_worker, result_backend):
# Given a result backend
# And a broker with the results middleware
stub_broker.add_middleware(Results(backend=result_backend))
# And an actor that adds two numbers together and stores the result
@dramatiq.actor(store_results=True)
def add(x, y):
return x + y
# When I pipe some messages intended for that actor together and run the pipeline
pipe = add.message(1, 2) | (add.message(3) | add.message(4))
pipe.run()
# Then the pipeline result should be the sum of 1, 2, 3 and 4
assert pipe.get_result(block=True) == 10
# And I should be able to retrieve individual results
assert list(pipe.get_results()) == [3, 6, 10]
import argparse
import random
import sys
import time
import dramatiq
from dramatiq.brokers.rabbitmq import RabbitmqBroker
from dramatiq.encoder import PickleEncoder
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
result_backend = RedisBackend(encoder=PickleEncoder())
broker = RabbitmqBroker()
broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(broker)
@dramatiq.actor(store_results=True)
def sleep_then_add(t, x, y):
time.sleep(t)
return x + y
def main(args):
parser = argparse.ArgumentParser()
parser.add_argument("count", type=int, help="the number of messages to enqueue")
args = parser.parse_args()
messages = []
for _ in range(args.count):
timeout(int): The maximum amount of time, in ms, to block
while waiting for a result.
Raises:
RuntimeError: If there is no result backend on the default
broker.
ResultMissing: When block is False and the result isn't set.
ResultTimeout: When waiting for a result times out.
Returns:
object: The result.
"""
if not backend:
broker = get_broker()
for middleware in broker.middleware:
if isinstance(middleware, Results):
backend = middleware.backend
break
else:
raise RuntimeError("The default broker doesn't have a results backend.")
return backend.get_result(self, block=block, timeout=timeout)