Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __next__(self):
try:
method, properties, body = next(self.iterator)
if method is None:
return None
message = Message.decode(body)
self.known_tags.add(method.delivery_tag)
return _RabbitmqMessage(method.delivery_tag, message)
except (AssertionError,
pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
raise ConnectionClosed(e) from None
def __next__(self):
try:
while True:
try:
# This is a micro-optimization so we try the fast
# path first. We assume there are messages in the
# cache and if there aren't, we go down the slow
# path of doing network IO.
data = self.message_cache.pop(0)
self.misses = 0
message = Message.decode(data)
return MessageProxy(message)
except IndexError:
# If there are fewer messages currently being
# processed than we're allowed to prefetch,
# prefetch up to that number of messages.
messages = []
if self.message_refc < self.prefetch:
self.message_cache = messages = self.broker.do_fetch(
self.queue_name,
self.prefetch - self.message_refc,
)
# Because we didn't get any messages, we should
# progressively long poll up to the idle timeout.
if not messages:
self.misses, backoff_ms = compute_backoff(self.misses, max_backoff=self.timeout)
def get_job(self, queue_name, message_id):
data = self.broker.client.hget(self.qualify(f"{queue_name}.msgs"), message_id)
return data and Job.from_message(Message.decode(data))
def __next__(self):
try:
data = self.queue.get(timeout=self.timeout / 1000)
message = Message.decode(data)
return MessageProxy(message)
except Empty:
return None
def get_jobs(self, queue_name, cursor=0):
next_cursor, messages_data = self.broker.client.hscan(
self.qualify(f"{queue_name}.msgs"), cursor, count=300,
)
if next_cursor == cursor:
next_cursor = None
jobs = [Job.from_message(Message.decode(data)) for data in messages_data.values()]
return next_cursor, sorted(jobs, key=attrgetter("timestamp"), reverse=True)