Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
raise ValueError('Number_of_batches must be defined if '
'both args_list and kwargs_list are empty')
if args_list is None:
args_list = number_of_batches * [list()]
if kwargs_list is None:
kwargs_list = number_of_batches * [dict()]
result = initial_value
if multiprocessing_pool_type == MultiprocessingPoolType.LOKY:
from concurrent.futures import as_completed
from loky import get_reusable_executor
executor = \
get_reusable_executor(timeout=None,
context='loky')
futures = [executor.submit(f, *args, **kwargs)
for args, kwargs
in zip(args_list, kwargs_list)]
result_from_future = lambda x: x.result()
elif multiprocessing_pool_type == MultiprocessingPoolType.PATHOS:
from pathos.pools import ProcessPool
pool = ProcessPool()
futures = [pool.apipe(f, *args, **kwargs)
for args, kwargs
in zip(args_list, kwargs_list)]
result_from_future = lambda x: x.get()
else:
results = [task(minibatch) for minibatch in paral_params]
else:
if backend=="multiprocessing":
with closing(multiprocessing.Pool(max(1, procs), maxtasksperchild=2)) as pool:
results = pool.map_async(task, paral_params)
pool.close()
pool.join()
results= results.get()
elif backend=="threading":
with closing(multiprocessing.dummy.Pool(max(1,procs))) as pool:
results= pool.map(task, paral_params)
pool.close()
pool.join()
if backend=="loky":
from loky import get_reusable_executor
pool= get_reusable_executor(max_workers=max(1, procs))
results= list(pool.map(task, paral_params))
elif backend == "dask":
###if not (input_split): data= self.scatter(data)
results = [self.backend_handle.submit(task, params) for params in paral_params]
elif backend == "spark":
def apply_func_to_indexedrdd(batch):
return [batch[0]] + [task([batch[1]] + args)]
results = paral_params.map(apply_func_to_indexedrdd)
elif backend == "ray":
import ray
@ray.remote
def f_ray(f, data):
return f(data)
results = [f_ray.remote(task, params) for params in paral_params]
results = [self.backend_handle.get(x) for x in results] #Slower, but handles edge cases
#results= self.backend_handle.get(results) #Faster, but crashes on edge cases?
progress_bar = BaseProgressBar()
if progress_bar is True:
progress_bar = TextProgressBar()
progress_bar.start(len(values))
nfinished = [0]
def _update_progress_bar(x):
nfinished[0] += 1
progress_bar.update(nfinished[0])
if USE_LOKY:
Executor = LokyReusableExecutor
if USE_THREADPOOL_LIMITS:
Executor = partial(
LokyReusableExecutor,
initializer=_process_threadpool_limits_initializier,
)
else:
Executor = ProcessPoolExecutor
_threadpool_limits = _no_threadpool_limits
if USE_THREADPOOL_LIMITS:
_threadpool_limits = threadpool_limits
with _threadpool_limits(limits=1):
with Executor(max_workers=num_cpus) as executor:
jobs = []
try:
for value in values:
args = (value,) + tuple(task_args)
job = executor.submit(task, *args, **task_kwargs)
if exclude_intel_devices:
compute_devices = \
filter(lambda x: 'intel' not in x.name.lower(),
[compute_device
for compute_device
in self._compute_devices])
self._compute_devices = frozenset(compute_devices)
# ctx = multiprocessing.get_context("spawn")
# self._executor = ProcessPoolExecutor(max_workers=self._n_gpus,
# mp_context=ctx)
if multiprocessing_pool_type == MultiprocessingPoolType.LOKY:
from loky import get_reusable_executor, wait
self._executor = get_reusable_executor(max_workers=self.number_of_devices,
timeout=None,
context='loky')
futures = [self._executor.submit(_init_gpu_in_process,
device_id=compute_device.id)
for compute_device
in self._compute_devices]
wait(futures)
[future.result() for future in futures]
elif multiprocessing_pool_type == MultiprocessingPoolType.PATHOS:
from pathos.pools import ProcessPool
self._executor = ProcessPool(nodes=self.number_of_devices)
futures = [self._executor.apipe(_init_gpu_in_process, device_id=compute_device.id)
num_cpus = multiprocessing.cpu_count()
if progress_bar is None:
progress_bar = BaseProgressBar()
if progress_bar is True:
progress_bar = TextProgressBar()
progress_bar.start(len(values))
nfinished = [0]
def _update_progress_bar(x):
nfinished[0] += 1
progress_bar.update(nfinished[0])
if USE_LOKY:
Executor = LokyReusableExecutor
if USE_THREADPOOL_LIMITS:
Executor = partial(
LokyReusableExecutor,
initializer=_process_threadpool_limits_initializier,
)
else:
Executor = ProcessPoolExecutor
_threadpool_limits = _no_threadpool_limits
if USE_THREADPOOL_LIMITS:
_threadpool_limits = threadpool_limits
with _threadpool_limits(limits=1):
with Executor(max_workers=num_cpus) as executor:
jobs = []
try:
def parallel_map(func, args, num_proc):
"""Run function for all arguments using multiple processes."""
num_proc = min(num_proc, len(args))
if num_proc <= 1:
return list(map(func, args))
else:
with get_reusable_executor(max_workers=num_proc, timeout=None) as e:
return list(e.map(func, args))