Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_timeout_decorated_callback(self):
"""Process Spawn ProcessExpired is forwarded to callback."""
future = critical_decorated()
future.add_done_callback(self.callback)
self.event.wait(timeout=1)
self.assertTrue(isinstance(self.exception, ProcessExpired),
msg=str(self.exception))
def test_pool_deadlock(self):
"""Process Pool Fork no deadlock if writing worker dies locking channel."""
with pebble.ProcessPool(max_workers=1) as pool:
with self.assertRaises(pebble.ProcessExpired):
pool.schedule(function).result()
def callback(self, future):
try:
self.results = future.result()
except (ProcessExpired, RuntimeError, TimeoutError) as error:
self.exception = error
finally:
self.event.set()
def test_process_pool_expired_worker(self):
"""Process Pool unexpect death of worker raises ProcessExpired."""
with process.Pool() as pool:
task = pool.schedule(suicide_function)
self.assertRaises(ProcessExpired, task.get)
def test_pool_deadlock(self):
"""Process Pool no deadlock if writing worker dies locking channel."""
with pebble.process.Pool() as pool:
with self.assertRaises(pebble.ProcessExpired):
pool.schedule(function).get()
def test_decorated_dead_process(self):
"""Process Concurrent ProcessExpired is raised if process dies."""
task = critical_decorated()
self.assertRaises(ProcessExpired, task.get)
def test_decorated_dead_process(self):
"""Process Forkserver ProcessExpired is raised if process dies."""
future = critical_decorated()
with self.assertRaises(ProcessExpired):
future.result()
def test_process_pool_expired_worker(self):
"""Process Pool Fork unexpect death of worker raises ProcessExpired."""
with ProcessPool(max_workers=1) as pool:
future = pool.schedule(suicide_function)
self.assertRaises(ProcessExpired, future.result)
read_default_file=const_mysql_config_file(),
charset='utf8mb4') as con:
results = []
with ProcessPool(max_workers=parallel, initializer=init_process,
initargs=(verbose, start_pystuck, RequestManager(parallel))) as pool:
future = pool.map(update_extension, [(archivedir, con, extid, archive) for extid, archive in tups], chunksize=1, timeout=timeout)
iterator = future.result()
for ext_id in ext_ids:
try:
results.append(next(iterator))
except StopIteration:
break
except TimeoutError as error:
log_warning("WorkerException: Processing of %s took longer than %d seconds" % (ext_id, error.args[1]))
results.append(UpdateResult(ext_id, False, None, None, None, None, None, None, None, error))
except ProcessExpired as error:
log_warning("WorkerException: %s (%s), exit code: %d" % (error, ext_id, error.exitcode))
results.append(UpdateResult(ext_id, False, None, None, None, None, None, None, None, error))
except Exception as error:
log_warning("WorkerException: Processing %s raised %s" % (ext_id, error))
log_warning(error.traceback) # Python's traceback of remote process
results.append(UpdateResult(ext_id, False, None, None, None, None, None, None, None, error))
return results
def task_done(future):
try:
result = future.result()
total_completed.append(result[0])
total_skipped.append(result[1])
except (ProcessExpired, TimeoutError) as e:
print("File {} timed-out".format(e.args[0]))
except Exception as e:
raise e
pbar.update()