Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Check if message is special type containing new function
# to be applied
if isinstance(task, _function_wrapper):
self.function = task.function
if self.debug:
print("Worker {0} replaced its task function: {1}."
.format(self.rank, self.function))
continue
# If not a special message, just run the known function on
# the input and return it asynchronously.
try:
result = self.function(task)
except:
tb = traceback.format_exc()
self.comm.isend(MPIPoolException(tb), dest=0, tag=status.tag)
return
if self.debug:
print("Worker {0} sending answer {1} with tag {2}."
.format(self.rank, result, status.tag))
self.comm.isend(result, dest=0, tag=status.tag)
print("Sent task {0} to worker {1} with tag {2}."
.format(task, worker, i))
r = self.comm.isend(task, dest=worker, tag=i)
requests.append(r)
MPI.Request.waitall(requests)
# Now wait for the answers.
results = []
for i in range(ntask):
worker = i % self.size + 1
if self.debug:
print("Master waiting for worker {0} with tag {1}"
.format(worker, i))
result = self.comm.recv(source=worker, tag=i)
if isinstance(result, MPIPoolException):
print("One of the MPIPool workers failed with the "
"exception:")
print(result.traceback)
raise result
if callback is not None:
callback(result)
results.append(result)
return results
else:
# Perform load-balancing. The order of the results are likely to
# be different from the previous case.
for i, task in enumerate(tasks[0:self.size]):