Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sync()
result = f(*args, **kwargs)
if device_to_host_transfer_function is not None:
result = device_to_host_transfer_function(result)
sync()
return result
result = initial_value
if self.multiprocessing_pool_type == MultiprocessingPoolType.LOKY:
from loky import as_completed
futures = [self._executor.submit(synced_f, *args, **kwargs)
for i, (args, kwargs)
in enumerate(zip(args_list, kwargs_list))]
for future in as_completed(futures):
result = reduction(result, future.result())
# result = reduce_with_none(result, future.result(), reduction)
elif self.multiprocessing_pool_type == MultiprocessingPoolType.PATHOS:
futures = [self._executor.apipe(synced_f, *args, **kwargs)
for args, kwargs
in zip(args_list, kwargs_list)]
for future in futures:
result = reduction(result, future.get())
# result = reduce_with_none(result, future.get(), reduction)
else:
raise ValueError(f'Multiprocessing pool type {self.multiprocessing_pool_type} not supported')
return result