Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def do_work():
nonlocal failure_time, success_time
if not failure_time:
failure_time = current_millis()
raise RuntimeError("First failure.")
else:
success_time = current_millis()
succeeded.set()
def test_redis_actors_can_have_their_messages_delayed(redis_broker, redis_worker):
# Given that I have a database
start_time, run_time = current_millis(), None
# And an actor that records the time it ran
@dramatiq.actor()
def record():
nonlocal run_time
run_time = current_millis()
# If I send it a delayed message
record.send_with_options(delay=1000)
# Then join on the queue
redis_broker.join(record.queue_name)
redis_worker.join()
# I expect that message to have been processed at least delayed milliseconds later
assert run_time - start_time >= 1000
def test_redis_broker_maintains_backwards_compat_with_old_acks(redis_broker):
# Given that I have an actor
@dramatiq.actor
def do_work(self):
pass
# And that actor has some old-style unacked messages
expired_message_id = b"expired-old-school-ack"
valid_message_id = b"valid-old-school-ack"
if redis.__version__.startswith("2."):
redis_broker.client.zadd("dramatiq:default.acks", 0, expired_message_id)
redis_broker.client.zadd("dramatiq:default.acks", current_millis(), valid_message_id)
else:
redis_broker.client.zadd("dramatiq:default.acks", {expired_message_id: 0})
redis_broker.client.zadd("dramatiq:default.acks", {valid_message_id: current_millis()})
# When maintenance runs for that actor's queue
redis_broker.maintenance_chance = MAINTENANCE_SCALE
redis_broker.do_qsize(do_work.queue_name)
# Then maintenance should move the expired message to the new style acks set
unacked = redis_broker.client.smembers("dramatiq:__acks__.%s.default" % redis_broker.broker_id)
assert set(unacked) == {expired_message_id}
# And the valid message should stay in that set
compat_unacked = redis_broker.client.zrangebyscore("dramatiq:default.acks", 0, "+inf")
assert set(compat_unacked) == {valid_message_id}
import time
from pathlib import Path
import pytest
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.common import current_millis
from .common import skip_in_ci, skip_on_pypy, skip_on_windows
broker = RedisBroker()
loaded_at = current_millis()
@dramatiq.actor(broker=broker)
def write_loaded_at(filename):
with open(filename, "w") as f:
f.write(str(loaded_at))
@skip_in_ci
@skip_on_windows
@skip_on_pypy
@pytest.mark.parametrize("extra_args", [
(),
("--watch-use-polling",),
])
def test_cli_can_watch_for_source_code_changes(start_cli, extra_args):
def test_rabbitmq_actors_can_have_their_messages_delayed(rabbitmq_broker, rabbitmq_worker):
# Given that I have a database
start_time, run_time = current_millis(), None
# And an actor that records the time it ran
@dramatiq.actor
def record():
nonlocal run_time
run_time = current_millis()
# If I send it a delayed message
record.send_with_options(delay=1000)
# Then join on the queue
rabbitmq_broker.join(record.queue_name)
rabbitmq_worker.join()
# I expect that message to have been processed at least delayed milliseconds later
assert run_time - start_time >= 1000
def after_process_message(self, broker, message, *, result=None, exception=None):
labels = (message.queue_name, message.actor_name)
message_start_time = self.message_start_times.pop(message.message_id, current_millis())
message_duration = current_millis() - message_start_time
self.message_durations.labels(*labels).observe(message_duration)
self.inprogress_messages.labels(*labels).dec()
self.total_messages.labels(*labels).inc()
if exception is not None:
self.total_errored_messages.labels(*labels).inc()
Raises:
ValueError: If ``delay`` is longer than 7 days.
"""
queue_name = message.queue_name
# Each enqueued message must have a unique id in Redis so
# using the Message's id isn't safe because messages may be
# retried.
message = message.copy(options={
"redis_message_id": str(uuid4()),
})
if delay is not None:
queue_name = dq_name(queue_name)
message_eta = current_millis() + delay
message = message.copy(
queue_name=queue_name,
options={
"eta": message_eta,
},
)
self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name)
self.emit_before("enqueue", message, delay)
self.do_enqueue(queue_name, message.options["redis_message_id"], message.encode())
self.emit_after("enqueue", message, delay)
return message
def do_dispatch(queue_name, *args):
timestamp = current_millis()
args = [
command,
timestamp,
queue_name,
self.broker_id,
self.heartbeat_timeout,
self.dead_message_ttl,
self._should_do_maintenance(command),
*args,
]
return dispatch(args=args, keys=keys)
return do_dispatch
def handle_delayed_messages(self):
"""Enqueue any delayed messages whose eta has passed.
"""
for eta, message in iter_queue(self.delay_queue):
if eta > current_millis():
self.delay_queue.put((eta, message))
self.delay_queue.task_done()
break
queue_name = q_name(message.queue_name)
new_message = message.copy(queue_name=queue_name)
del new_message.options["eta"]
self.broker.enqueue(new_message)
self.post_process_message(message)
self.delay_queue.task_done()
def before_process_message(self, broker, message):
labels = (message.queue_name, message.actor_name)
if message.message_id in self.delayed_messages:
self.delayed_messages.remove(message.message_id)
self.inprogress_delayed_messages.labels(*labels).dec()
self.inprogress_messages.labels(*labels).inc()
self.message_start_times[message.message_id] = current_millis()