Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
ntask = len(tasks)
# If not the master just wait for instructions.
if not self.is_master():
self.wait()
return
if function is not self.function:
if self.debug:
print("Master replacing pool function with {0}."
.format(function))
self.function = function
F = _function_wrapper(function)
# Tell all the workers what function to use.
requests = []
for i in range(self.size):
r = self.comm.isend(F, dest=i + 1)
requests.append(r)
# Wait until all of the workers have responded. See:
# https://gist.github.com/4176241
MPI.Request.waitall(requests)
if (not self.loadbalance) or (ntask <= self.size):
# Do not perform load-balancing - the default load-balancing
# scheme emcee uses.
# Send all the tasks off and wait for them to be received.
# Blocking receive to wait for instructions.
task = self.comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
if self.debug:
print("Worker {0} got task {1} with tag {2}."
.format(self.rank, task, status.tag))
# Check if message is special sentinel signaling end.
# If so, stop.
if isinstance(task, _close_pool_message):
if self.debug:
print("Worker {0} told to quit.".format(self.rank))
break
# Check if message is special type containing new function
# to be applied
if isinstance(task, _function_wrapper):
self.function = task.function
if self.debug:
print("Worker {0} replaced its task function: {1}."
.format(self.rank, self.function))
continue
# If not a special message, just run the known function on
# the input and return it asynchronously.
try:
result = self.function(task)
except:
tb = traceback.format_exc()
self.comm.isend(MPIPoolException(tb), dest=0, tag=status.tag)
return
if self.debug:
print("Worker {0} sending answer {1} with tag {2}."