Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def after_process_message(self, broker, message, *, result=None, exception=None):
from ..message import Message
if exception is None:
group_completion_uuid = message.options.get("group_completion_uuid")
group_completion_callbacks = message.options.get("group_completion_callbacks")
if group_completion_uuid and group_completion_callbacks:
barrier = Barrier(self.rate_limiter_backend, group_completion_uuid, ttl=GROUP_CALLBACK_BARRIER_TTL)
if barrier.wait(block=False):
for message in group_completion_callbacks:
broker.enqueue(Message(**message))
def after_process_message(self, broker, message, *, result=None, exception=None):
# Since Pipelines is a default middleware, this import has to
# happen at runtime in order to avoid a cyclic dependency
# from broker -> pipelines -> messages -> broker.
from ..message import Message
if exception is not None or message.failed:
return
actor = broker.get_actor(message.actor_name)
message_data = message.options.get("pipe_target")
if message_data is not None:
next_message = Message(**message_data)
pipe_ignore = next_message.options.get("pipe_ignore") or actor.options.get("pipe_ignore")
if not pipe_ignore:
next_message = next_message.copy(args=next_message.args + (result,))
broker.enqueue(next_message)
kwargs(dict): Keyword arguments that are passed to the actor.
**options(dict): Arbitrary options that are passed to the
broker and any registered middleware.
Returns:
Message: A message that can be enqueued on a broker.
"""
for name in ["on_failure", "on_success"]:
callback = options.get(name)
if isinstance(callback, Actor):
options[name] = callback.actor_name
elif not isinstance(callback, (type(None), str)):
raise TypeError(name + " value must be an Actor")
return Message(
queue_name=self.queue_name,
actor_name=self.actor_name,
args=args or (), kwargs=kwargs or {},
options=options,
)