Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_process_pool_task_limit(self):
"""Process Pool task limit is honored."""
tasks = []
with process.Pool(task_limit=2) as pool:
for i in range(0, 4):
tasks.append(pool.schedule(pid_function))
self.assertEqual(len(set([t.get() for t in tasks])), 2)
def test_process_pool_join_tasks_timeout(self):
"""Process Pool TimeoutError is raised if join on long tasks."""
pool = process.Pool()
for index in range(2):
pool.schedule(long_function)
pool.close()
self.assertRaises(TimeoutError, pool.join, 0.4)
pool.stop()
pool.join()
def test_process_pool_callback_error(self):
"""Process Pool does not stop if error in callback."""
with process.Pool() as pool:
try:
pool.schedule(function, args=[1], callback=error_callback,
kwargs={'keyword_argument': 1})
pool.schedule(function, args=[1],
kwargs={'keyword_argument': 1})
except Exception:
self.fail("Error raised")
def test_process_pool_stop_timeout(self):
"""Process Pool workers are stopped if task timeout."""
with process.Pool() as pool:
task1 = pool.schedule(pid_function)
pool.schedule(long_function, timeout=0.1)
task2 = pool.schedule(pid_function)
self.assertNotEqual(task1.get(), task2.get())
def test_process_pool_schedule_id(self):
"""Process Pool task ID is forwarded to it."""
with process.Pool() as pool:
task = pool.schedule(function, args=[1], identifier='foo')
self.assertEqual(task.id, 'foo')
def test_process_pool_timeout(self):
"""Process Pool task raises TimeoutError if so."""
with process.Pool() as pool:
task = pool.schedule(long_function, timeout=0.1)
self.assertRaises(TimeoutError, task.get)
def test_process_pool_single_task(self):
"""Process Pool single task."""
with process.Pool() as pool:
task = pool.schedule(function, args=[1],
kwargs={'keyword_argument': 1})
self.assertEqual(task.get(), 2)
def test_process_pool_join_running(self):
"""Process Pool RuntimeError is raised if active pool joined."""
with process.Pool() as pool:
pool.schedule(function, args=[1])
self.assertRaises(RuntimeError, pool.join)
def test_process_pool_stop_tasks(self):
"""Process Pool not all tasks are performed on stop."""
tasks = []
pool = process.Pool()
for index in range(10):
tasks.append(pool.schedule(function, args=[index]))
pool.stop()
pool.join()
self.assertTrue(len([t for t in tasks if not t.ready]) > 0)
if failed > 0:
logging.warning("Failed loading {} jsons".format(failed))
logging.info("Loaded {} jsons from {}".format(len(D), cache))
elif isinstance(cache, dict):
logging.warning("Partial saving is not supported for dict. This means all of the computation is thrown away"
"if any jobs raises error.")
D = cache
else:
raise NotImplementedError("Not recognized cache type")
# Filter out calculated jobs
logging.info("Skipping {}/{} calculated jobs".format(sum(k in D for k in keys), len(keys)))
jobs, keys = [j for k, j in zip(keys, jobs) if k not in D], [k for k in keys if k not in D]
# Run everything
pool = Pool(n_jobs)
time_elapsed = 0
try:
with tqdm.tqdm(total=len(jobs)) as bar:
scheduled_jobs = []
# Schedule jobs
for j in jobs:
if isinstance(j, list) or isinstance(j, tuple):
scheduled_jobs.append(pool.schedule(fnc, args=j, timeout=job_timeout))
elif isinstance(j, dict):
scheduled_jobs.append(pool.schedule(fnc, kwargs=j, timeout=job_timeout))
else:
scheduled_jobs.append(pool.schedule(fnc, args=[j], timeout=job_timeout))
# Run calculation
while True: