Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_mp():
from pathos.pools import ProcessPool as Pool
pool = Pool(nodes=4)
check_sanity( pool )
check_maps( pool, items, delay )
check_dill( pool )
check_ready( pool, maxtries, delay, verbose=False )
def parcompute_example():
dc = PMPExample()
dc2 = PMPExample()
dc3 = PMPExample()
dc4 = PMPExample()
n_datapoints = 100
inp_data = range(n_datapoints)
r1 = dc.threadcompute(inp_data)
assert(len(dc.cache) == n_datapoints)
r2 = dc2.processcompute(inp_data)
assert(len(dc2.cache) == 0)
assert(r1 == r2)
r3 = ProcessPool(4).map(dc3.compute, inp_data)
r4 = ThreadPool(4).map(dc4.compute, inp_data)
ProcessPool.__state__.clear()
ThreadPool.__state__.clear()
assert(r4 == r3 == r2)
assert(len(dc3.cache) == 0)
assert(len(dc4.cache) == n_datapoints)
log.info("Size of threadpooled class caches: {0}, {1}".format(len(dc.cache), len(dc4.cache)))
log.info("Size of processpooled class caches: {0}, {1}".format(len(dc2.cache), len(dc3.cache)))
def to_template(frames, template, regfn, njobs=4, **fnargs):
"""
Given stack of frames (or a FSeq obj) and a template image,
align every frame to template and return a collection of functions,
which take image coordinates and return warped coordinates, which whould align the
image to the template.
"""
if njobs > 1 and _with_pathos_:
pool = ProcessPool(nodes=njobs)
out = pool.map(partial(regfn, template=template, **fnargs), frames)
#pool.close() ## doesn't work when this is active
else:
print('Running in one process')
out = [regfn(img, template, **fnargs) for img in frames]
return out
def apply_warps(warps, frames, njobs=4):
"""
returns result of applying warps for given frames (one warp per frame)
"""
if njobs > 1 :
pool = ProcessPool(nodes=njobs)
out = pool.map(parametric_warp, frames, warps)
#pool.close()
out = np.array(out)
else:
out = np.array([parametric_warp(f,w) for f,w in itt.izip(frames, warps)])
if isinstance(frames, fseq.FrameSequence):
out = fseq.open_seq(out)
out.meta = frames.meta
return out
random_seed(seed)
print("first sequential...")
solver = DifferentialEvolutionSolver2(ND,NP) #XXX: sequential
solver.SetRandomInitialPoints(min=[-100.0]*ND, max=[100.0]*ND)
solver.SetEvaluationLimits(generations=MAX_GENERATIONS)
solver.SetGenerationMonitor(ssow)
solver.Solve(myCost, VTR(TOL), strategy=Best1Exp, \
CrossProbability=CROSS, ScalingFactor=SCALE, disp=1)
print("")
print_solution( solver.bestSolution )
random_seed(seed)
print("\n and now parallel...")
solver2 = DifferentialEvolutionSolver2(ND,NP) #XXX: parallel
solver2.SetMapper(Pool(NNODES).map)
solver2.SetRandomInitialPoints(min=[-100.0]*ND, max=[100.0]*ND)
solver2.SetEvaluationLimits(generations=MAX_GENERATIONS)
solver2.SetGenerationMonitor(psow)
solver2.Solve(myCost, VTR(TOL), strategy=Best1Exp, \
CrossProbability=CROSS, ScalingFactor=SCALE, disp=1)
print("")
print_solution( solver2.bestSolution )
shutdown() # help multiprocessing shutdown all workers
#from pyina.launchers import Mpi as Pool
from pathos.pools import ProcessPool as Pool
#from pool_helper import func_pickle # if fails to pickle, try using a helper
# run optimizer for each subdiameter
lb = [lower + [lower[i]] for i in range(start,end+1)]
ub = [upper + [upper[i]] for i in range(start,end+1)]
nb = [nbins[:] for i in range(start,end+1)]
for i in range(len(nb)): nb[i][-1] = nb[i][i]
cf = [costFactory(i) for i in range(start,end+1)]
#cf = [func_pickle(i) for i in cf]
#cf = [cost.name for cost in cf]
nnodes = len(lb)
#construct cost function and run optimizer
results = Pool(nnodes).map(optimize, cf,lb,ub,nb)
#print("results = %s" % results)
results = list(zip(*results))
diameters = list(results[0])
function_evaluations = list(results[1])
total_func_evals = sum(function_evaluations)
total_diameter = sum(diameters)
print("subdiameters (squared): %s" % diameters)
print("diameter (squared): %s" % total_diameter)
print("func_evals: %s => %s" % (function_evaluations, total_func_evals))
return total_diameter
def processcompute(self, xs):
pool = ProcessPool(4)
results = pool.map(self.compute, xs)
return results