Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_job_with_command(job_dummy_cmds):
"""Create dummy job with command."""
job = pyani_jobs.Job("dummy", job_dummy_cmds[0])
assert job.script == job_dummy_cmds[0]
def test_add_dependency(job_dummy_cmds):
"""Create dummy job with dependency."""
job1 = pyani_jobs.Job("dummy_with_dependency", job_dummy_cmds[0])
job2 = pyani_jobs.Job("dummy_dependency", job_dummy_cmds[1])
job1.add_dependency(job2)
dep = job1.dependencies[0]
assert (job_dummy_cmds[0], job_dummy_cmds[1], 1) == (
job1.script,
dep.script,
len(job1.dependencies),
)
# If we're in recovery mode, we don't want to repeat a computational
# comparison that already exists, so we check whether the ultimate
# output is in the set of existing files and, if not, we add the jobs
# TODO: something faster than a list search (dict or set?)
# The comparisons collections always gets updated, so that results are
# added to the database whether they come from recovery mode or are run
# in this call of the script.
comparisons.append(Comparison(qid, sid, dcmd, outfname))
if args.recovery and os.path.split(outfname)[-1] in existingfiles:
logger.debug("Recovering output from %s, not building job",
outfname)
else:
logger.debug("Building job")
# Build jobs
njob = pyani_jobs.Job("%s_%06d-n" % (jobprefix, idx), ncmd)
fjob = pyani_jobs.Job("%s_%06d-f" % (jobprefix, idx), dcmd)
fjob.add_dependency(njob)
joblist.append(fjob)
# Pass commands to the appropriate scheduler
logger.info("Passing %d jobs to scheduler", len(joblist))
if args.scheduler == 'multiprocessing':
logger.info("Running jobs with multiprocessing")
if not args.workers:
logger.info("(using maximum number of worker threads)")
else:
logger.info("(using %d worker threads, if available)",
args.workers)
cumval = run_mp.run_dependency_graph(joblist,
workers=args.workers,
logger=logger)
outfname = Path(outprefix + ".filter")
logger.debug("Expected output file for db: %s", outfname)
# If we're in recovery mode, we don't want to repeat a computational
# comparison that already exists, so we check whether the ultimate
# output is in the set of existing files and, if not, we add the jobs
# TODO: something faster than a list search (dict or set?)
# The comparisons collections always gets updated, so that results are
# added to the database whether they come from recovery mode or are run
# in this call of the script.
if args.recovery and outfname.name in existingfiles:
logger.debug("Recovering output from %s, not building job", outfname)
else:
logger.debug("Building job")
# Build jobs
njob = pyani_jobs.Job("%s_%06d-n" % (args.jobprefix, idx), ncmd)
fjob = pyani_jobs.Job("%s_%06d-f" % (args.jobprefix, idx), dcmd)
fjob.add_dependency(njob)
joblist.append(ComparisonJob(query, subject, dcmd, ncmd, outfname, fjob))
return joblist
corresponding to the database creation are contained as dependencies.
How those jobs are scheduled depends on the scheduler (see
run_multiprocessing.py, run_sge.py)
"""
joblist = [] # Holds list of job dependency graphs
# Get dictionary of database-building jobs
dbjobdict = build_db_jobs(infiles, blastcmds)
# Create list of BLAST executable jobs, with dependencies
jobnum = len(dbjobdict)
for idx, fname1 in enumerate(fragfiles[:-1]):
for fname2 in fragfiles[idx + 1 :]:
jobnum += 1
jobs = [
pyani_jobs.Job(
f"{blastcmds.prefix}_exe_{jobnum:06d}_a",
blastcmds.build_blast_cmd(
fname1, fname2.parent / fname2.name.replace("-fragments", "")
),
),
pyani_jobs.Job(
f"{blastcmds.prefix}_exe_{jobnum:06d}_b",
blastcmds.build_blast_cmd(
fname2, fname1.parent / fname1.name.replace("-fragments", "")
),
),
]
jobs[0].add_dependency(
dbjobdict[fname1.parent / fname1.name.replace("-fragments", "")]
)
jobs[1].add_dependency(