Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_process_pool_timeout_callback(self):
"""Process Pool Spawn TimeoutError is forwarded to callback."""
with ProcessPool(max_workers=1) as pool:
future = pool.schedule(long_function, timeout=0.1)
future.add_done_callback(self.callback)
self.event.wait()
self.assertTrue(isinstance(self.exception, TimeoutError))
def test_process_pool_callback(self):
"""Process Pool Forkserver result is forwarded to the callback."""
with ProcessPool(max_workers=1) as pool:
future = pool.schedule(
function, args=[1], kwargs={'keyword_argument': 1})
future.add_done_callback(self.callback)
self.event.wait()
self.assertEqual(self.result, 2)
def test_process_pool_future_limit(self):
"""Process Pool Forkserver tasks limit is honored."""
futures = []
with ProcessPool(max_workers=1, max_tasks=2) as pool:
for _ in range(0, 4):
futures.append(pool.schedule(pid_function))
self.assertEqual(len(set([f.result() for f in futures])), 2)
def test_pool_deadlock_stop(self):
"""Process Pool Fork reading deadlocks are stopping the Pool."""
with self.assertRaises(RuntimeError):
pool = pebble.ProcessPool(max_workers=1)
for _ in range(10):
pool.schedule(function)
time.sleep(0.1)
def test_process_pool_map_error(self):
"""Process Pool Spawn errors do not stop the iteration."""
raised = None
elements = [1, 'a', 3]
with ProcessPool(max_workers=1) as pool:
future = pool.map(function, elements)
generator = future.result()
while True:
try:
next(generator)
except TypeError as error:
raised = error
except StopIteration:
break
self.assertTrue(isinstance(raised, TypeError))
def test_process_pool_callback_error(self):
"""Process Pool Spawn does not stop if error in callback."""
with ProcessPool(max_workers=1) as pool:
future = pool.schedule(function, args=[1],
kwargs={'keyword_argument': 1})
future.add_done_callback(self.callback)
# sleep enough to ensure callback is run
time.sleep(0.1)
pool.schedule(function, args=[1],
kwargs={'keyword_argument': 1})
def test_process_pool_join_running(self):
"""Process Pool Fork RuntimeError is raised if active pool joined."""
with ProcessPool(max_workers=1) as pool:
pool.schedule(function, args=[1])
self.assertRaises(RuntimeError, pool.join)
def test_process_pool_timeout(self):
"""Process Pool Fork future raises TimeoutError if so."""
with ProcessPool(max_workers=1) as pool:
future = pool.schedule(long_function, timeout=0.1)
self.assertRaises(TimeoutError, future.result)
def test_process_pool_timeout(self):
"""Process Pool Forkserver future raises TimeoutError if so."""
with ProcessPool(max_workers=1) as pool:
future = pool.schedule(long_function, timeout=0.1)
self.assertRaises(TimeoutError, future.result)
def run_with_timeout(entry_point, timeout, progress, dt=0.1, **kwargs):
# TODO : multi-process over the different tokens
spinner = itertools.cycle(r"\|/-")
pool = pebble.ProcessPool(max_workers=1)
line = elapsed = format_time(0)
with pool:
t0 = time.time()
func = entry_point.load()
future = pool.schedule(func, kwargs=kwargs, timeout=timeout)
while not future.done():
if progress is not None:
line = "\r" + elapsed + " " + progress + " " + next(spinner)
sys.stderr.write(line)
sys.stderr.flush()
time.sleep(dt)
elapsed = format_time(time.time() - t0, timeout)
walltime = time.time() - t0
try:
a, b = future.result()
except Exception as err: