Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
if q is not None:
if q.size > 0:
return [
checks.Info(
"queue contains pending messages", id=QUEUE_PENDING_INFO_ID
)
]
elif q.unack_count() > 0:
return [
checks.Info(
"queue contains unacked messages", id=QUEUE_UNACKED_INFO_ID
)
]
except Exception:
return [checks.Error("queue raised exception on access", id=QUEUE_ERROR_ID)]
else:
return []
def check_disk_bytes_free(app: Sanic, q: SQLiteAckQueue):
"""Check disk for QUEUE_PATH has minimum amount of bytes free."""
threshold = app.config.get("MINIMUM_DISK_FREE_BYTES")
if None in (q, threshold) or q.path in (None, ":memory:"):
return []
try:
status = os.statvfs(q.path)
except FileNotFoundError:
return [checks.Warning("queue path does not exist", id=NO_QUEUE_WARNING_ID)]
bytes_free = status.f_bfree * status.f_frsize
if bytes_free < threshold:
return [checks.Error("disk bytes free below threshold", id=LOW_DISK_ERROR_ID)]
else:
return []