Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
__all__ = ['ThreadPool','_ThreadPool']
#FIXME: probably not good enough... should store each instance with a uid
__STATE = _ThreadPool__STATE = {}
from pathos.abstract_launcher import AbstractWorkerPool
from pathos.helpers.mp_helper import starargs as star
from pathos.helpers import cpu_count, ThreadPool as _ThreadPool
try:
from itertools import izip as zip
except ImportError:
pass
class ThreadPool(AbstractWorkerPool):
"""
Mapper that leverages python's threading.
"""
def __init__(self, *args, **kwds):
"""\nNOTE: if number of nodes is not given, will autodetect processors
"""
hasnodes = 'nodes' in kwds; arglen = len(args)
if 'nthreads' in kwds and (hasnodes or arglen):
msg = "got multiple values for keyword argument 'nthreads'"
raise TypeError(msg)
elif hasnodes: #XXX: multiple try/except is faster?
if arglen:
msg = "got multiple values for keyword argument 'nodes'"
raise TypeError(msg)
kwds['nthreads'] = kwds.pop('nodes')
elif arglen:
#FIXME: probably not good enough... should store each instance with a uid
__STATE = _ProcessPool__STATE = {}
from pathos.abstract_launcher import AbstractWorkerPool
from pathos.helpers.mp_helper import starargs as star
from pathos.helpers import cpu_count, freeze_support, ProcessPool as Pool
try:
from itertools import izip as zip
except ImportError:
pass
# 'forward' compatibility
_ProcessPool = Pool
class ProcessPool(AbstractWorkerPool):
"""
Mapper that leverages python's multiprocessing.
"""
def __init__(self, *args, **kwds):
"""\nNOTE: if number of nodes is not given, will autodetect processors
"""
hasnodes = 'nodes' in kwds; arglen = len(args)
if 'ncpus' in kwds and (hasnodes or arglen):
msg = "got multiple values for keyword argument 'ncpus'"
raise TypeError(msg)
elif hasnodes: #XXX: multiple try/except is faster?
if arglen:
msg = "got multiple values for keyword argument 'nodes'"
raise TypeError(msg)
kwds['ncpus'] = kwds.pop('nodes')
elif arglen:
Notes
=====
This worker pool leverages the built-in python maps, and thus does not have
limitations due to serialization of the function f or the sequences in args.
The maps in this worker pool have full functionality whether run from a script
or in the python interpreter, and work reliably for both imported and
interactively-defined functions.
"""
__all__ = ['SerialPool']
from pathos.abstract_launcher import AbstractWorkerPool
__get_nodes__ = AbstractWorkerPool._AbstractWorkerPool__get_nodes
__set_nodes__ = AbstractWorkerPool._AbstractWorkerPool__set_nodes
try:
from builtins import map as _map
_apply = lambda f, args, kwds: f(*args, **kwds)
_imap = _map
PY3 = True
import sys
P33 = (hex(sys.hexversion) >= '0x30300f0')
except ImportError:
from __builtin__ import map as _map, apply as _apply
from itertools import imap as _imap
PY3 = False
P33 = False
#XXX: good for interface... or bad idea?
__STATE = _SerialPool__STATE = {}
Notes
=====
This worker pool leverages the built-in python maps, and thus does not have
limitations due to serialization of the function f or the sequences in args.
The maps in this worker pool have full functionality whether run from a script
or in the python interpreter, and work reliably for both imported and
interactively-defined functions.
"""
__all__ = ['SerialPool']
from pathos.abstract_launcher import AbstractWorkerPool
__get_nodes__ = AbstractWorkerPool._AbstractWorkerPool__get_nodes
__set_nodes__ = AbstractWorkerPool._AbstractWorkerPool__set_nodes
try:
from builtins import map as _map
_apply = lambda f, args, kwds: f(*args, **kwds)
_imap = _map
PY3 = True
import sys
P33 = (hex(sys.hexversion) >= '0x30300f0')
except ImportError:
from __builtin__ import map as _map, apply as _apply
from itertools import imap as _imap
PY3 = False
P33 = False
#XXX: good for interface... or bad idea?
__STATE = _SerialPool__STATE = {}
'progargs' : '',
'outfile' : 'results%sout' % _pid,
'errfile' : 'errors%sout' % _pid,
'jobfile' : 'job%sid' % _pid,
'scheduler' : '',
'timelimit' : '00:02',
'queue' : 'normal',
'workdir' : '.'
}
#FIXME FIXME: __init__ and self for 'nodes' vs 'ncpus' is confused; see __repr__
class Mapper(AbstractWorkerPool):
"""
Mapper base class for pipe-based mapping.
"""
def __init__(self, *args, **kwds):
"""\nNOTE: if number of nodes is not given, will default to 1.
If source is not given, will attempt to minimially use TemporaryFiles.
If workdir is not given, will default to scheduler's workdir or $WORKDIR.
If scheduler is not given, will default to only run on the current node.
If timeout is not given, will default to scheduler's timelimit or INF.
For more details, see the docstrings for the "map" method, or the man page
for the associated launcher (e.g mpirun, mpiexec).
"""
AbstractWorkerPool.__init__(self, *args, **kwds)
self.scheduler = kwds.get('scheduler', None)
self.scatter = True #bool(kwds.get('scatter', True))
clear = _clear
def map(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
_pool = self._serve()
return _pool.map(star(f), zip(*args)) # chunksize
map.__doc__ = AbstractWorkerPool.map.__doc__
def imap(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__imap(self, f, *args, **kwds)
_pool = self._serve()
return _pool.imap(star(f), zip(*args)) # chunksize
imap.__doc__ = AbstractWorkerPool.imap.__doc__
def uimap(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__imap(self, f, *args, **kwds)
_pool = self._serve()
return _pool.imap_unordered(star(f), zip(*args)) # chunksize
uimap.__doc__ = AbstractWorkerPool.uimap.__doc__
def amap(self, f, *args, **kwds): # register a callback ?
AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
_pool = self._serve()
return _pool.map_async(star(f), zip(*args)) # chunksize
amap.__doc__ = AbstractWorkerPool.amap.__doc__
########################################################################
# PIPES
def pipe(self, f, *args, **kwds):
#AbstractWorkerPool._AbstractWorkerPool__pipe(self, f, *args, **kwds)
_pool = self._serve()
return _pool.apply(f, args, kwds)
pipe.__doc__ = AbstractWorkerPool.pipe.__doc__
def apipe(self, f, *args, **kwds): # register a callback ?
#AbstractWorkerPool._AbstractWorkerPool__apipe(self, f, *args, **kwds)
_pool = self._serve()
return _pool.apply_async(f, args, kwds)