Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_dq_name_returns_canonical_delay_names(given, expected):
assert dq_name(given) == expected
def test_redis_requeues_unhandled_delay_messages_on_shutdown(redis_broker):
# Given that I have an actor that takes its time
@dramatiq.actor
def do_work():
pass
# If I send it a delayed message
message = do_work.send_with_options(delay=10000)
# Then start a worker and subsequently shut it down
with worker(redis_broker, worker_threads=1):
pass
# I expect it to have re-enqueued the message
messages = redis_broker.client.lrange("dramatiq:%s" % dq_name(do_work.queue_name), 0, 10)
assert message.options["redis_message_id"].encode("utf-8") in messages
def flush(self, queue_name):
"""Drop all the messages from a queue.
Parameters:
queue_name(str): The queue to flush.
"""
for name in (queue_name, dq_name(queue_name)):
self.do_purge(name)
def tab_from_q_name(name):
if name == dq_name(name):
return "delayed"
elif name == xq_name(name):
return "failed"
else:
return "standard"
def enqueue(self, message, *, delay=None):
"""Enqueue a message.
Parameters:
message(Message): The message to enqueue.
delay(int): The minimum amount of time, in milliseconds, to
delay the message by.
Raises:
QueueNotFound: If the queue the message is being enqueued on
doesn't exist.
"""
queue_name = message.queue_name
if delay is not None:
queue_name = dq_name(queue_name)
message_eta = current_millis() + delay
message = message.copy(
queue_name=queue_name,
options={
"eta": message_eta,
},
)
if queue_name not in self.queues:
raise QueueNotFound(queue_name)
self.emit_before("enqueue", message, delay)
self.queues[queue_name].put(message.encode())
self.emit_after("enqueue", message, delay)
return message
QueueJoinTimeout: When the timeout elapses.
Parameters:
queue_name(str): The queue to wait on.
interval(Optional[int]): The interval, in milliseconds, at
which to check the queues.
timeout(Optional[int]): The max amount of time, in
milliseconds, to wait on this queue.
"""
deadline = timeout and time.monotonic() + timeout / 1000
while True:
if deadline and time.monotonic() >= deadline:
raise QueueJoinTimeout(queue_name)
size = 0
for name in (queue_name, dq_name(queue_name)):
size += self.do_qsize(name)
if size == 0:
return
time.sleep(interval / 1000)
def queue_for_tab(name, tab):
return {
"standard": name,
"delayed": dq_name(name),
"failed": xq_name(name),
}[tab]