Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_mp():
from pathos.pools import ProcessPool as Pool
pool = Pool(nodes=4)
check_sanity( pool )
check_maps( pool, items, delay )
check_dill( pool )
check_ready( pool, maxtries, delay, verbose=False )
def test_pp():
from pathos.pools import ParallelPool as PPP
pool = PPP(servers=('localhost:5653','localhost:2414'))
res = timed_pool(pool, items, delay, verbose)
assert res == std
def test_with_pp():
from pathos.pools import ParallelPool
run_with_multipool(ParallelPool)
#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
# Copyright (c) 1997-2016 California Institute of Technology.
# Copyright (c) 2016-2020 The Uncertainty Quantification Foundation.
# License: 3-clause BSD. The full license text is available at:
# - https://github.com/uqfoundation/pathos/blob/master/LICENSE
def host(id):
import socket
return "Rank: %d -- %s" % (id, socket.gethostname())
if __name__ == '__main__':
from pathos.pools import ThreadPool as TPool
tpool = TPool()
print("Evaluate 10 items on 1 thread")
tpool.nthreads = 1
res3 = tpool.map(host, range(10))
print(tpool)
print('\n'.join(res3))
print('')
print("Evaluate 10 items on 2 threads")
tpool.nthreads = 2
res5 = tpool.map(host, range(10))
print(tpool)
print('\n'.join(res5))
print('')
print("Evaluate 10 items on ? threads")
def test_tp():
from pathos.pools import ThreadPool as Pool
pool = Pool(nodes=4)
check_sanity( pool )
check_maps( pool, items, delay )
check_dill( pool )
check_ready( pool, maxtries, delay, verbose=False )
def test_threading():
from pathos.pools import ThreadPool as MTP
pool = MTP()
res = timed_pool(pool, items, delay, verbose)
assert res == std
tuple(_uimap(disable_profiling, range(10))) # in the workers
for i in _uimap(work, range(-20,-10)):
print(i)
"""
# activate profiling, but remove profiling from the worker
enable_profiling()
for i in map(not_profiled(work), range(-30,-20)):
print(i)
# print stats for profile of 'import math' in another process
def import_ppft(*args):
import ppft
import pathos.pools as pp
pool = pp.ProcessPool(1)
profile('cumulative', pipe=pool.pipe)(import_ppft)
pool.close()
pool.join()
pool.clear()
category_length = relevant_X.shape[0]
result = {
'trained_category_model': category_trained_final_model
, 'category': category
, 'len_relevant_X': category_length
}
return result
if os.environ.get('is_test_suite', False) == 'True':
# If this is the test_suite, do not run things in parallel
results = list(map(lambda x: train_one_categorical_model(x[0], x[1], x[2]), categories_and_data))
else:
pool = pathos.multiprocessing.ProcessPool()
# Since we may have already closed the pool, try to restart it
try:
pool.restart()
except AssertionError as e:
pass
try:
results = list(pool.map(lambda x: train_one_categorical_model(x[0], x[1], x[2]), categories_and_data))
except RuntimeError:
# Deep Learning models require a ton of recursion. I've tried to work around it, but sometimes we just need to brute force the solution here
original_recursion_limit = sys.getrecursionlimit()
sys.setrecursionlimit(10000)
results = list(pool.map(lambda x: train_one_categorical_model(x[0], x[1], x[2]), categories_and_data))
sys.setrecursionlimit(original_recursion_limit)
def onChild(pid, fromparent, toparent):
try:
response = self._server._marshaled_dispatch(data)
self._sendResponse(response)
line = _str(fromparent.readline())
toparent.write(_b('done\n'))
toparent.flush()
except:
logger(name='pathos.xmlrpc', level=30).error(print_exc_info())
os._exit(0)
def imap(self, f, *args, **kwds):
AbstractWorkerPool._AbstractWorkerPool__imap(self, f, *args, **kwds)
def submit(*argz):
"""send a job to the server"""
_pool = self._serve()
#print("using %s local workers" % _pool.get_ncpus())
try:
return _pool.submit(f, argz, globals=globals())
except pp.DestroyedServerError:
self._is_alive(None)
# submit all jobs, then collect results as they become available
return (subproc() for subproc in list(builtins.map(submit, *args)))
imap.__doc__ = AbstractWorkerPool.imap.__doc__