Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
ntask = len(tasks)
# If not the master just wait for instructions.
if not self.is_master():
self.wait()
return
if function is not self.function:
if self.debug:
print("Master replacing pool function with {0}."
.format(function))
self.function = function
F = _function_wrapper(function)
# Tell all the workers what function to use.
requests = []
for i in range(self.size):
r = self.comm.isend(F, dest=i + 1)
requests.append(r)
# Wait until all of the workers have responded. See:
# https://gist.github.com/4176241
MPI.Request.waitall(requests)
if (not self.loadbalance) or (ntask <= self.size):
# Do not perform load-balancing - the default load-balancing
# scheme emcee uses.
# Send all the tasks off and wait for them to be received.
# Check if message is special type containing new function
# to be applied
if isinstance(task, _function_wrapper):
self.function = task.function
if self.debug:
print("Worker {0} replaced its task function: {1}."
.format(self.rank, self.function))
continue
# If not a special message, just run the known function on
# the input and return it asynchronously.
try:
result = self.function(task)
except:
tb = traceback.format_exc()
self.comm.isend(MPIPoolException(tb), dest=0, tag=status.tag)
return
if self.debug:
print("Worker {0} sending answer {1} with tag {2}."
.format(self.rank, result, status.tag))
self.comm.isend(result, dest=0, tag=status.tag)
print("Sent task {0} to worker {1} with tag {2}."
.format(task, worker, i))
r = self.comm.isend(task, dest=worker, tag=i)
requests.append(r)
MPI.Request.waitall(requests)
# Now wait for the answers.
results = []
for i in range(ntask):
worker = i % self.size + 1
if self.debug:
print("Master waiting for worker {0} with tag {1}"
.format(worker, i))
result = self.comm.recv(source=worker, tag=i)
if isinstance(result, MPIPoolException):
print("One of the MPIPool workers failed with the "
"exception:")
print(result.traceback)
raise result
if callback is not None:
callback(result)
results.append(result)
return results
else:
# Perform load-balancing. The order of the results are likely to
# be different from the previous case.
for i, task in enumerate(tasks[0:self.size]):
def close(self):
"""
Just send a message off to all the pool members which contains
the special :class:`_close_pool_message` sentinel.
"""
if self.is_master():
for i in range(self.size):
self.comm.isend(_close_pool_message(), dest=i + 1)
while True:
# Event loop.
# Sit here and await instructions.
if self.debug:
print("Worker {0} waiting for task.".format(self.rank))
# Blocking receive to wait for instructions.
task = self.comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
if self.debug:
print("Worker {0} got task {1} with tag {2}."
.format(self.rank, task, status.tag))
# Check if message is special sentinel signaling end.
# If so, stop.
if isinstance(task, _close_pool_message):
if self.debug:
print("Worker {0} told to quit.".format(self.rank))
break
# Check if message is special type containing new function
# to be applied
if isinstance(task, _function_wrapper):
self.function = task.function
if self.debug:
print("Worker {0} replaced its task function: {1}."
.format(self.rank, self.function))
continue
# If not a special message, just run the known function on
# the input and return it asynchronously.
try:
# Blocking receive to wait for instructions.
task = self.comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
if self.debug:
print("Worker {0} got task {1} with tag {2}."
.format(self.rank, task, status.tag))
# Check if message is special sentinel signaling end.
# If so, stop.
if isinstance(task, _close_pool_message):
if self.debug:
print("Worker {0} told to quit.".format(self.rank))
break
# Check if message is special type containing new function
# to be applied
if isinstance(task, _function_wrapper):
self.function = task.function
if self.debug:
print("Worker {0} replaced its task function: {1}."
.format(self.rank, self.function))
continue
# If not a special message, just run the known function on
# the input and return it asynchronously.
try:
result = self.function(task)
except:
tb = traceback.format_exc()
self.comm.isend(MPIPoolException(tb), dest=0, tag=status.tag)
return
if self.debug:
print("Worker {0} sending answer {1} with tag {2}."
from setuptools import setup
setup
except ImportError:
from distutils.core import setup
setup
if sys.version_info[0] < 3:
import __builtin__ as builtins
else:
import builtins
builtins.__MPIPOOL_SETUP__ = True
import mpipool
setup(
name="mpipool",
version=mpipool.__version__,
author="Adrian Price-Whelan",
author_email="adrn@astro.columbia.edu",
packages=["mpipool"],
url="https://github.com/adrn/mpipool/",
license="MIT",
description="MPI pool",
package_data={"": ["LICENSE", "AUTHORS"]},
include_package_data=True,
classifiers=[
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python",
],