Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Create a communication representing the ongoing communication, and store it in pending_comms
comm = mbox.put_async(content, self.msg_size)
pending_comms.append(comm)
# Start sending messages to let the workers know that they should stop
for i in range(0, self.receivers_count):
mbox = mboxes[i]
this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
comm = mbox.put_async("finalize", 0)
pending_comms.append(comm)
this_actor.info("Done dispatching all messages")
# Now that all message exchanges were initiated, wait for their completion in one single call
Comm.wait_all(pending_comms)
this_actor.info("Goodbye now!")
# Start sending messages to let the workers know that they should stop
for i in range(0, self.receivers_count):
mbox = mboxes[i]
this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
comm = mbox.put_async("finalize", 0)
pending_comms.append(comm)
this_actor.info("Done dispatching all messages")
# Now that all message exchanges were initiated, wait for their completion, in order of completion.
#
# This loop waits for first terminating message with wait_any() and remove it with del, until all comms are
# terminated.
# Even in this simple example, the pending comms do not terminate in the exact same order of creation.
while pending_comms:
changed_pos = Comm.wait_any(pending_comms)
del pending_comms[changed_pos]
if (changed_pos != 0):
this_actor.info(
"Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first.".format(changed_pos))
this_actor.info("Goodbye now!")