Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from pyina.tools import isoseconds
self.timeout = isoseconds(self.scheduler.timelimit)
else:
from numpy import inf
self.timeout = inf #XXX: better than defaults.timelimit ?
elif isinstance(self.timeout, str):
from pyina.tools import isoseconds
self.timeout = isoseconds(self.timeout)
if self.workdir == None:
if self.scheduler:
self.workdir = self.scheduler.workdir
else:
self.workdir = os.environ.get('WORKDIR', os.path.curdir)
self.workdir = os.path.abspath(self.workdir)
return
if AbstractWorkerPool.__init__.__doc__: __init__.__doc__ = AbstractWorkerPool.__init__.__doc__ + __init__.__doc__
def __settings(self):
"""apply default settings, then update with given settings"""
env = defaults.copy()
[env.update({k:v}) for (k,v) in self.__dict__.items() if k in defaults]
[env.update({'nodes':v}) for (k,v) in self.__dict__.items() if k.endswith('nodes')] # deal with self.__nodes
return env
def __launch(self, command):
"""launch mechanism for prepared launch command"""
executable = command.split("|")[-1].split()[0]
from pox import which
if not which(executable):
raise IOError("launch failed: %s not found" % executable)
return Popen([command], shell=True) #FIXME: shell=True is insecure
def _launcher(self, kdict={}):
"""prepare launch command based on current settings
def __init__(self, *args, **kwds):
"""\nNOTE: if number of nodes is not given, will default to 1.
If source is not given, will attempt to minimially use TemporaryFiles.
If workdir is not given, will default to scheduler's workdir or $WORKDIR.
If scheduler is not given, will default to only run on the current node.
If timeout is not given, will default to scheduler's timelimit or INF.
For more details, see the docstrings for the "map" method, or the man page
for the associated launcher (e.g mpirun, mpiexec).
"""
AbstractWorkerPool.__init__(self, *args, **kwds)
self.scheduler = kwds.get('scheduler', None)
self.scatter = True #bool(kwds.get('scatter', True))
self.source = bool(kwds.get('source', False))
self.workdir = kwds.get('workdir', None)
self.timeout = kwds.get('timeout', None)
if self.timeout == None:
if self.scheduler:
from pyina.tools import isoseconds
self.timeout = isoseconds(self.scheduler.timelimit)
else:
from numpy import inf
self.timeout = inf #XXX: better than defaults.timelimit ?
elif isinstance(self.timeout, str):
from pyina.tools import isoseconds
self.timeout = isoseconds(self.timeout)
if self.workdir == None:
If scheduler is not given, will default to only run on the current node.
If pickle is not given, will attempt to minimially use TemporaryFiles.
For more details, see the docstrings for the "map" method, or the man page
for the associated launcher (e.g mpirun, mpiexec).
"""
Mapper.__init__(self, *args, **kwds)
self.scatter = bool(kwds.get('scatter', False)) #XXX: hang w/ nodes=1 ?
#self.nodes = kwds.get('nodes', None)
if not len(args) and 'nodes' not in kwds:
if self.scheduler:
self.nodes = self.scheduler.nodes
else:
self.nodes = cpu_count()
return
if AbstractWorkerPool.__init__.__doc__: __init__.__doc__ = AbstractWorkerPool.__init__.__doc__ + __init__.__doc__
def njobs(self, nodes):
"""convert node_string intended for scheduler to int number of nodes
compute int from node string. For example, parallel.njobs("4") yields 4
"""
return int(str(nodes)) #XXX: this is a dummy function
def _launcher(self, kdict={}):
"""prepare launch command for pipe-based execution
equivalent to: (python) (program) (progargs)
NOTES:
run non-python commands with: {'python':'', ...}
"""
mydict = self.settings.copy()
mydict.update(kdict)
elif hasnodes: #XXX: multiple try/except is faster?
if arglen:
msg = "got multiple values for keyword argument 'nodes'"
raise TypeError(msg)
kwds['ncpus'] = kwds.pop('nodes')
elif arglen:
kwds['ncpus'] = args[0]
self.__nodes = kwds.get('ncpus', cpu_count())
# Create an identifier for the pool
self._id = 'pool'
# Create a new server if one isn't already initialized
self._serve()
return
if AbstractWorkerPool.__init__.__doc__: __init__.__doc__ = AbstractWorkerPool.__init__.__doc__ + __init__.__doc__
#def __exit__(self, *args):
# self._clear()
# return
def _serve(self, nodes=None): #XXX: should be STATE method; use id
"""Create a new server if one isn't already initialized"""
if nodes is None: nodes = self.__nodes
_pool = __STATE.get(self._id, None)
if not _pool or nodes != _pool.__nodes:
self._clear()
_pool = Pool(nodes)
_pool.__nodes = nodes
__STATE[self._id] = _pool
return _pool
def _clear(self): #XXX: should be STATE method; use id
"""Remove server with matching state"""
_pool = __STATE.get(self._id, None)
if servers is None: servers = ()
#from _ppserver_config import ppservers as servers # config file
# Create an identifier for the pool
self._id = 'server'
#XXX: throws 'socket.error' when starting > 1 server with autodetect
# Create a new server if one isn't already initialized
# ...and set the requested level of multi-processing
self._exiting = False
_pool = self._serve(nodes=ncpus, servers=servers)
#XXX: or register new UID for each instance?
#_pool.set_ncpus(ncpus or 'autodetect') # no ncpus=0
#print("configure %s local workers" % _pool.get_ncpus())
return
if AbstractWorkerPool.__init__.__doc__: __init__.__doc__ = AbstractWorkerPool.__init__.__doc__ + __init__.__doc__
#def __exit__(self, *args):
# self._clear()
# return
def _serve(self, nodes=None, servers=None): #XXX: is a STATE method; use id
"""Create a new server if one isn't already initialized"""
# get nodes and servers in form used by pp.Server
if nodes is None: nodes = self.nodes #XXX: autodetect must be explicit
if nodes in ['*']: nodes = 'autodetect'
if servers is None:
servers = tuple(sorted(self.__servers)) # no servers is ()
elif servers in ['*', 'autodetect']: servers = ('*',)
# if no server, create one
_pool = __STATE.get(self._id, None)
if not _pool:
_pool = pp.Server(ppservers=servers)
# convert to form returned by pp.Server, then compare
elif hasnodes: #XXX: multiple try/except is faster?
if arglen:
msg = "got multiple values for keyword argument 'nodes'"
raise TypeError(msg)
kwds['nthreads'] = kwds.pop('nodes')
elif arglen:
kwds['nthreads'] = args[0]
self.__nodes = kwds.get('nthreads', cpu_count())
# Create an identifier for the pool
self._id = 'threads'
# Create a new server if one isn't already initialized
self._serve()
return
if AbstractWorkerPool.__init__.__doc__: __init__.__doc__ = AbstractWorkerPool.__init__.__doc__ + __init__.__doc__
#def __exit__(self, *args):
# self._clear()
# return
def _serve(self, nodes=None): #XXX: should be STATE method; use id
"""Create a new server if one isn't already initialized"""
if nodes is None: nodes = self.__nodes
_pool = __STATE.get(self._id, None)
if not _pool or nodes != _pool.__nodes:
self._clear()
_pool = _ThreadPool(nodes)
_pool.__nodes = nodes
__STATE[self._id] = _pool
return _pool
def _clear(self): #XXX: should be STATE method; use id
"""Remove server with matching state"""
_pool = __STATE.get(self._id, None)