Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# submit the jobs
if self.run_local:
# this only works in python 3 ?!
with futures.ProcessPoolExecutor(n_jobs) as tp:
tasks = [tp.submit(self._submit_job, job_id, assignment_path, n_threads)
for job_id in range(n_jobs)]
[t.result() for t in tasks]
# for job_id in range(n_jobs):
# self._submit_job(job_id, assignment_path, n_threads)
else:
for job_id in range(n_jobs):
self._submit_job(job_id, assignment_path, n_threads)
# wait till all jobs are finished
if not self.run_local:
util.wait_for_jobs('papec')
# check the job outputs
processed_blocks, times = self._collect_outputs(n_jobs)
assert len(processed_blocks) == len(times)
success = len(processed_blocks) == n_blocks
# write output file if we succeed, otherwise write partial
# success to different file and raise exception
if success:
out = self.output()
# TODO does 'out' support with block?
fres = out.open('w')
json.dump({'times': times}, fres)
fres.close()
else:
log_path = os.path.join(self.tmp_folder, 'write_assignments_partial.json')
self._prepare_jobs(n_jobs, n_blocks, block_shape)
# submit the jobs
if self.run_local:
# this only works in python 3 ?!
with futures.ProcessPoolExecutor(n_jobs) as tp:
tasks = [tp.submit(self._submit_job, job_id)
for job_id in range(n_jobs)]
[t.result() for t in tasks]
else:
for job_id in range(n_jobs):
self._submit_job(job_id)
# wait till all jobs are finished
if not self.run_local:
util.wait_for_jobs('papec')
# check the job outputs
processed_jobs, times = self._collect_outputs(n_jobs)
assert len(processed_jobs) == len(times)
success = len(processed_jobs) == n_jobs
# write output file if we succeed, otherwise write partial
# success to different file and raise exception
if success:
out = self.output()
# TODO does 'out' support with block?
fres = out.open('w')
json.dump({'times': times}, fres)
fres.close()
else:
log_path = os.path.join(self.tmp_folder, 'find_uniques_partial.json')
def _submit_jobs(self, n_jobs, prefix):
from .. import util
if self.run_local:
# this only works in python 3 ?!
with futures.ProcessPoolExecutor(n_jobs) as tp:
tasks = [tp.submit(self._submit_job, job_id, prefix)
for job_id in range(n_jobs)]
[t.result() for t in tasks]
else:
for job_id in range(n_jobs):
self._submit_job(job_id, prefix)
# wait till all jobs are finished
if not self.run_local:
util.wait_for_jobs('papec')
def _submit_jobs(self, n_jobs, prefix):
from .. import util
if self.run_local:
# this only works in python 3 ?!
with futures.ProcessPoolExecutor(n_jobs) as tp:
tasks = [tp.submit(self._submit_job, job_id, prefix)
for job_id in range(n_jobs)]
[t.result() for t in tasks]
else:
for job_id in range(n_jobs):
self._submit_job(job_id, prefix)
# wait till all jobs are finished
if not self.run_local:
util.wait_for_jobs('papec')
block_list = blocking.getBlockIdsOverlappingBoundingBox(roi[0], roi[1], [0, 0, 0])
n_blocks = len(block_list)
n_jobs = min(n_blocks, self.max_jobs)
command = '%s %s %s %i %s %s' % (script_path, self.path, self.out_key,
n_jobs, self.config_path, self.tmp_folder)
log_file = os.path.join(self.tmp_folder, 'logs', 'log_merges.log')
err_file = os.path.join(self.tmp_folder, 'error_logs', 'err_merges.err')
bsub_command = 'bsub -n %i -J compute_merges -We %i -o %s -e %s \'%s\'' % (n_threads,
self.time_estimate,
log_file, err_file, command)
if self.run_local:
subprocess.call([command], shell=True)
else:
subprocess.call([bsub_command], shell=True)
util.wait_for_jobs('papec')
# check if all output was produced
out_path = self.output().path
try:
success = True
assert os.path.exists(out_path)
res_path = os.path.join(self.tmp_folder, 'time_consensus_stitching.json')
with open(res_path) as f:
json.load(f)['t']
except Exception:
success = False
# clean up output
rmtree(out_path)
if not success:
raise RuntimeError('MergesTask failed')
script_path = os.path.join(self.tmp_folder, 'compute_block_offsets.py')
command = '%s %s %s' % (script_path, in_path, out_path)
log_file = os.path.join(self.tmp_folder, 'logs', 'log_compute_offsets.log')
err_file = os.path.join(self.tmp_folder, 'error_logs', 'err_compute_offsets.err')
bsub_command = 'bsub -J compute_offsets -We %i -o %s -e %s \'%s\'' % (self.time_estimate,
log_file, err_file, command)
# submit job
if self.run_local:
subprocess.call([command], shell=True)
else:
subprocess.call([bsub_command], shell=True)
# wait job is finished
if not self.run_local:
util.wait_for_jobs('papec')
# check for correct execution
success = os.path.exists(out_path)
if not success:
raise RuntimeError("Compute offsets failed")
with open(config_path, 'w') as f:
json.dump({'n_threads': n_threads}, f)
# submit the job
command = '%s %s %i %s' % (script_path, self.tmp_folder, n_jobs, config_path)
log_file = os.path.join(self.tmp_folder, 'logs', 'log_node_assignment')
err_file = os.path.join(self.tmp_folder, 'error_logs', 'err_node_assignment.err')
bsub_command = 'bsub -n %i -J nde_assignment -We %i -o %s -e %s \'%s\'' % (n_threads, self.time_estimate,
log_file, err_file, command)
if self.run_local:
subprocess.call([command], shell=True)
else:
subprocess.call([bsub_command], shell=True)
# wait till all jobs are finished
if not self.run_local:
util.wait_for_jobs('papec')
# check for correct execution
out_path = self.output().path
success = os.path.exists(out_path)
if not success:
raise RuntimeError("Compute node assignment failed")