Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_q_name_returns_canonical_names(given, expected):
assert q_name(given) == expected
def handle_delayed_messages(self):
"""Enqueue any delayed messages whose eta has passed.
"""
for eta, message in iter_queue(self.delay_queue):
if eta > current_millis():
self.delay_queue.put((eta, message))
self.delay_queue.task_done()
break
queue_name = q_name(message.queue_name)
new_message = message.copy(queue_name=queue_name)
del new_message.options["eta"]
self.broker.enqueue(new_message)
self.post_process_message(message)
self.delay_queue.task_done()
def _add_consumer(self, queue_name, *, delay=False):
if queue_name in self.consumers:
self.logger.debug("A consumer for queue %r is already running.", queue_name)
return
canonical_name = q_name(queue_name)
if self.consumer_whitelist and canonical_name not in self.consumer_whitelist:
self.logger.debug("Dropping consumer for queue %r: not whitelisted.", queue_name)
return
consumer = self.consumers[queue_name] = _ConsumerThread(
broker=self.broker,
queue_name=queue_name,
prefetch=self.delay_prefetch if delay else self.queue_prefetch,
work_queue=self.work_queue,
worker_timeout=self.worker_timeout,
)
consumer.start()
def build_message_key(self, message) -> str:
"""Given a message, return its globally-unique key.
Parameters:
message(Message)
Returns:
str
"""
message_key = "%(namespace)s:%(queue_name)s:%(actor_name)s:%(message_id)s" % {
"namespace": self.namespace,
"queue_name": q_name(message.queue_name),
"actor_name": message.actor_name,
"message_id": message.message_id,
}
return hashlib.md5(message_key.encode("utf-8")).hexdigest()
message_id = req.post_data["id"]
job = self.iface.get_job(queue, message_id)
if not job:
return HTTP_410, "The requested message no longer exists."
self.iface.delete_message(queue, message_id)
job_copy = Job.from_message(self.broker.enqueue(job.message.copy(
queue_name=q_name(queue),
options={
"eta": job.message_timestamp,
"retries": 0
},
)))
return redirect(self.make_uri(
"queues",
q_name(queue),
tab_from_q_name(queue),
job_copy.message_id,
))
def job(self, req, *, name, current_tab, message_id):
qft = queue_for_tab(name, current_tab)
queue = self.iface.get_queue(q_name(name))
job = self.iface.get_job(qft, message_id)
if not job:
return redirect(self.make_uri("queues", name, current_tab))
return {
"queue": queue,
"job": job,
"queue_for_tab": qft,
}
def delete_message(self, req):
if req.method != "POST":
return HTTP_405, "Expected a POST request."
queue = req.post_data["queue"]
message_id = req.post_data["id"]
self.iface.delete_message(queue, message_id)
return redirect(self.make_uri("queues", q_name(queue), tab_from_q_name(queue)))