Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def list(self):
count = 0
while True:
queues = self._master.xreadgroup(
groupname=STREAM_GROUP,
consumername=CONSUMER_NAME,
streams={self._name: ">"},
count=1,
block=round(self._timeout_ms),
)
if not queues:
queues = self._claim_olds()
if queues is None:
stats.set_gauge(["redis", self._name_str, "nb_messages"], 0)
stats.set_gauge(["redis", self._name_str, "pending"], 0)
if queues is None and self._stop_if_empty:
break
if queues:
for redis_message in queues:
queue_name, queue_messages = redis_message
assert queue_name == self._name
for message in queue_messages:
id_, body = message
try:
tile = decode_message(body[b"message"], from_redis=True, sqs_message=id_)
yield tile
except Exception:
logger.warning("Failed decoding the Redis message", exc_info=True)
stats.increment_counter(["redis", self._name_str, "decode_error"])
count += 1
for message in queue_messages:
id_, body = message
try:
tile = decode_message(body[b"message"], from_redis=True, sqs_message=id_)
yield tile
except Exception:
logger.warning("Failed decoding the Redis message", exc_info=True)
stats.increment_counter(["redis", self._name_str, "decode_error"])
count += 1
if count % 10 == 0:
stats.set_gauge(
["redis", self._name_str, "nb_messages"], self._slave.xlen(name=self._name)
)
pending = self._slave.xpending(self._name, STREAM_GROUP)
stats.set_gauge(["redis", self._name_str, "pending"], pending["pending"])
def get_status(self):
"""
Returns a map of stats
"""
nb_messages = self._slave.xlen(self._name)
pending = self._slave.xpending(self._name, STREAM_GROUP)
tiles_in_error = self._get_errors()
stats.set_gauge(["redis", self._name_str, "nb_messages"], nb_messages)
return {
"Approximate number of tiles to generate": nb_messages,
"Approximate number of generating tiles": pending["pending"],
"Tiles in error": ", ".join(tiles_in_error),
}
if queues:
for redis_message in queues:
queue_name, queue_messages = redis_message
assert queue_name == self._name
for message in queue_messages:
id_, body = message
try:
tile = decode_message(body[b"message"], from_redis=True, sqs_message=id_)
yield tile
except Exception:
logger.warning("Failed decoding the Redis message", exc_info=True)
stats.increment_counter(["redis", self._name_str, "decode_error"])
count += 1
if count % 10 == 0:
stats.set_gauge(
["redis", self._name_str, "nb_messages"], self._slave.xlen(name=self._name)
)
pending = self._slave.xpending(self._name, STREAM_GROUP)
stats.set_gauge(["redis", self._name_str, "pending"], pending["pending"])