Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def check_queue_size(q: SQLiteAckQueue):
"""Check queue size."""
try:
if q is not None:
if q.size > 0:
return [
checks.Info(
"queue contains pending messages", id=QUEUE_PENDING_INFO_ID
)
]
elif q.unack_count() > 0:
return [
checks.Info(
"queue contains unacked messages", id=QUEUE_UNACKED_INFO_ID
)
]
except Exception:
return [checks.Error("queue raised exception on access", id=QUEUE_ERROR_ID)]
else:
return []
def check_queue_size(q: SQLiteAckQueue):
"""Check queue size."""
try:
if q is not None:
if q.size > 0:
return [
checks.Info(
"queue contains pending messages", id=QUEUE_PENDING_INFO_ID
)
]
elif q.unack_count() > 0:
return [
checks.Info(
"queue contains unacked messages", id=QUEUE_UNACKED_INFO_ID
)
]
except Exception:
return [checks.Error("queue raised exception on access", id=QUEUE_ERROR_ID)]
else:
return []