Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#!/usr/bin/env python
#
# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
# Copyright (c) 1997-2016 California Institute of Technology.
# Copyright (c) 2016-2020 The Uncertainty Quantification Foundation.
# License: 3-clause BSD. The full license text is available at:
# - https://github.com/uqfoundation/pathos/blob/master/LICENSE
def host(id):
import socket
return "Rank: %d -- %s" % (id, socket.gethostname())
if __name__ == '__main__':
from pathos.pools import ThreadPool as TPool
tpool = TPool()
print("Evaluate 10 items on 1 thread")
tpool.nthreads = 1
res3 = tpool.map(host, range(10))
print(tpool)
print('\n'.join(res3))
print('')
print("Evaluate 10 items on 2 threads")
tpool.nthreads = 2
res5 = tpool.map(host, range(10))
print(tpool)
print('\n'.join(res5))
print('')
print("Evaluate 10 items on ? threads")
def test_tp():
from pathos.pools import ThreadPool as Pool
pool = Pool(nodes=4)
check_sanity( pool )
check_maps( pool, items, delay )
check_dill( pool )
check_ready( pool, maxtries, delay, verbose=False )
def test_threading():
from pathos.pools import ThreadPool as MTP
pool = MTP()
res = timed_pool(pool, items, delay, verbose)
assert res == std
dc3 = PMPExample()
dc4 = PMPExample()
n_datapoints = 100
inp_data = range(n_datapoints)
r1 = dc.threadcompute(inp_data)
assert(len(dc.cache) == n_datapoints)
r2 = dc2.processcompute(inp_data)
assert(len(dc2.cache) == 0)
assert(r1 == r2)
r3 = ProcessPool(4).map(dc3.compute, inp_data)
r4 = ThreadPool(4).map(dc4.compute, inp_data)
ProcessPool.__state__.clear()
ThreadPool.__state__.clear()
assert(r4 == r3 == r2)
assert(len(dc3.cache) == 0)
assert(len(dc4.cache) == n_datapoints)
log.info("Size of threadpooled class caches: {0}, {1}".format(len(dc.cache), len(dc4.cache)))
log.info("Size of processpooled class caches: {0}, {1}".format(len(dc2.cache), len(dc3.cache)))
dc = PMPExample()
dc2 = PMPExample()
dc3 = PMPExample()
dc4 = PMPExample()
n_datapoints = 100
inp_data = range(n_datapoints)
r1 = dc.threadcompute(inp_data)
assert(len(dc.cache) == n_datapoints)
r2 = dc2.processcompute(inp_data)
assert(len(dc2.cache) == 0)
assert(r1 == r2)
r3 = ProcessPool(4).map(dc3.compute, inp_data)
r4 = ThreadPool(4).map(dc4.compute, inp_data)
ProcessPool.__state__.clear()
ThreadPool.__state__.clear()
assert(r4 == r3 == r2)
assert(len(dc3.cache) == 0)
assert(len(dc4.cache) == n_datapoints)
log.info("Size of threadpooled class caches: {0}, {1}".format(len(dc.cache), len(dc4.cache)))
log.info("Size of processpooled class caches: {0}, {1}".format(len(dc2.cache), len(dc3.cache)))
return sum(tmap(g, x))
def f(x,y):
return x*y
x = range(10)
y = range(5)
if __name__ == '__main__':
from pathos.helpers import freeze_support, shutdown
freeze_support()
from pathos.pools import ProcessPool, ThreadPool
amap = ProcessPool().amap
tmap = ThreadPool().map
print(amap(f, [h(x),h(x),h(x),h(x),h(x)], y).get())
def _f(m, g, x, y):
return sum(m(g,x))*y
print(amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get())
from math import sin, cos
print(amap(tmap, [sin,cos], [x,x]).get())
shutdown()
#from pathos.pools import ProcessPool as Pool
from pathos.pools import ThreadPool as Pool
#from pool_helper import func_pickle # if fails to pickle, try using a helper
# run optimizer for each subdiameter
lb = [lower + [lower[i]] for i in range(start,end+1)]
ub = [upper + [upper[i]] for i in range(start,end+1)]
nb = [nbins[:] for i in range(start,end+1)]
for i in range(len(nb)): nb[i][-1] = nb[i][i]
cf = [costFactory(i) for i in range(start,end+1)]
#cf = [func_pickle(i) for i in cf]
#cf = [cost.name for cost in cf]
nnodes = len(lb)
#construct cost function and run optimizer
results = Pool(nnodes).map(optimize, cf,lb,ub,nb)
#print("results = %s" % results)
results = list(zip(*results))
diameters = list(results[0])
function_evaluations = list(results[1])
total_func_evals = sum(function_evaluations)
total_diameter = sum(diameters)
print("subdiameters (squared): %s" % diameters)
print("diameter (squared): %s" % total_diameter)
print("func_evals: %s => %s" % (function_evaluations, total_func_evals))
return total_diameter
print("Output: %s\n" % np.asarray(y))
if HAS_PYINA:
# map sin2 to the workers, then print to screen
print("Running mpi4py on %d cores..." % nodes)
y = MpiPool(nodes).map(sin2, x)
print("Output: %s\n" % np.asarray(y))
# map sin2 to the workers, then print to screen
print("Running multiprocesing on %d processors..." % nodes)
y = ProcessPool(nodes).map(sin2, x)
print("Output: %s\n" % np.asarray(y))
# map sin2 to the workers, then print to screen
print("Running multiprocesing on %d threads..." % nodes)
y = ThreadPool(nodes).map(sin2, x)
print("Output: %s\n" % np.asarray(y))
# map sin2 to the workers, then print to screen
print("Running parallelpython on %d cpus..." % nodes)
y = ParallelPool(nodes).map(sin2, x)
print("Output: %s\n" % np.asarray(y))
# ensure all pools shutdown
shutdown()