Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def insured(node, fun, *args, **kwargs):
"""Ensures any function performing a broadcast command completes
despite intermittent connection failures."""
def errback(exc, interval):
supervisor.error(
"Error while trying to broadcast %r: %r\n" % (fun, exc))
supervisor.pause()
return _insured(node.broker.pool, fun, args, kwargs,
on_revive=state.on_broker_revive,
errback=errback)
def on_connection_revived(self):
state.on_broker_revive()
def on_connection_revived(self):
state.on_broker_revive()
def consume_forever(self):
drain_events = self.drain_events
with celery.broker_connection() as conn:
conn.ensure_connection(self.on_connection_error,
celery.conf.BROKER_CONNECTION_MAX_RETRIES)
state.on_broker_revive()
self.info("Connected to %s" % (conn.as_uri(), ))
with conn.channel() as channel:
with self._consume_from(
*self.get_consumers(partial(Consumer, channel),
channel)):
while 1:
drain_events(conn, timeout=1)