Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_groups_expose_completion_stats(stub_broker, stub_worker, result_backend):
# Given that I have a result backend
stub_broker.add_middleware(Results(backend=result_backend))
# And an actor that waits some amount of time
condition = Condition()
@dramatiq.actor(store_results=True)
def wait(n):
time.sleep(n)
with condition:
condition.notify_all()
return n
# When I group messages of varying durations together and run the group
g = group(wait.message(n) for n in range(1, 4))
g.run()
# Then every time a job in the group completes, the completed_count should increase
for count in range(1, len(g) + 1):
with condition:
condition.wait(5)
time.sleep(0.1) # give the worker time to set the result
assert g.completed_count == count
# Finally, completed should be true
assert g.completed
def test_groups_can_time_out(stub_broker, stub_worker, result_backend):
# Given that I have a result backend
stub_broker.add_middleware(Results(backend=result_backend))
# And I have an actor that sleeps for 300ms
@dramatiq.actor(store_results=True)
def wait():
time.sleep(0.3)
# When I group a few jobs together and run it
g = group(wait.message() for _ in range(2))
g.run()
# And wait for the group to complete with a timeout
# Then a ResultTimeout error should be raised
with pytest.raises(ResultTimeout):
g.wait(timeout=100)
# And the group should not be completed
assert not g.completed
def test_groups_execute_jobs_in_parallel(stub_broker, stub_worker, result_backend):
# Given that I have a result backend
stub_broker.add_middleware(Results(backend=result_backend))
# And I have an actor that sleeps for 100ms
@dramatiq.actor(store_results=True)
def wait():
time.sleep(0.1)
# When I group multiple of these actors together and run them
t = time.monotonic()
g = group([wait.message() for _ in range(5)])
g.run()
# And wait on the group to complete
results = list(g.get_results(block=True))
# Then the total elapsed time should be less than 500ms
assert time.monotonic() - t <= 0.5
# And I should get back as many results as there were jobs in the group
assert len(results) == len(g)
# And the group should be completed
assert g.completed
def test_groups_execute_inner_groups(stub_broker, stub_worker, result_backend):
# Given that I have a result backend
stub_broker.add_middleware(Results(backend=result_backend))
# And I have an actor that sleeps for 100ms
@dramatiq.actor(store_results=True)
def wait():
time.sleep(0.1)
# When I group multiple groups inside one group and run it
t = time.monotonic()
g = group(group(wait.message() for _ in range(2)) for _ in range(3))
g.run()
# And wait on the group to complete
results = list(g.get_results(block=True))
# Then the total elapsed time should be less than 500ms
assert time.monotonic() - t <= 0.5
# And I should get back 3 results each with 2 results inside it
assert results == [[None, None]] * 3
# And the group should be completed
assert g.completed
def main():
parser = argparse.ArgumentParser()
parser.add_argument("uri", nargs="+", help="A website URI.")
arguments = parser.parse_args()
jobs = group(request.message(uri) | count_words.message() for uri in arguments.uri).run()
for uri, count in zip(arguments.uri, jobs.get_results(block=True)):
print(f" * {uri} has {count} words")
return 0