Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_job():
"""Create a dummy job."""
job = pyani_jobs.Job("empty", "")
assert job.script == ""
def test_cmdsets(self):
"""Test that module builds command sets."""
job1 = pyani_jobs.Job("dummy_with_dependency", self.cmds[0])
job2 = pyani_jobs.Job("dummy_dependency", self.cmds[1])
job1.add_dependency(job2)
cmdsets = run_multiprocessing.populate_cmdsets(job1, list(), depth=1)
target = [{cmd} for cmd in self.cmds]
self.assertEqual(cmdsets, target)
# If we're in recovery mode, we don't want to repeat a computational
# comparison that already exists, so we check whether the ultimate
# output is in the set of existing files and, if not, we add the jobs
# TODO: something faster than a list search (dict or set?)
# The comparisons collections always gets updated, so that results are
# added to the database whether they come from recovery mode or are run
# in this call of the script.
comparisons.append(Comparison(qid, sid, dcmd, outfname))
if args.recovery and os.path.split(outfname)[-1] in existingfiles:
logger.debug("Recovering output from %s, not building job",
outfname)
else:
logger.debug("Building job")
# Build jobs
njob = pyani_jobs.Job("%s_%06d-n" % (jobprefix, idx), ncmd)
fjob = pyani_jobs.Job("%s_%06d-f" % (jobprefix, idx), dcmd)
fjob.add_dependency(njob)
joblist.append(fjob)
# Pass commands to the appropriate scheduler
logger.info("Passing %d jobs to scheduler", len(joblist))
if args.scheduler == 'multiprocessing':
logger.info("Running jobs with multiprocessing")
if not args.workers:
logger.info("(using maximum number of worker threads)")
else:
logger.info("(using %d worker threads, if available)",
args.workers)
cumval = run_mp.run_dependency_graph(joblist,
workers=args.workers,
logger=logger)
if 0 < cumval:
:param filenames: Iterable, Paths to input FASTA files
:param outdir: str, path to output directory
:param nucmer_exe: str, location of the nucmer binary
:param filter_exe:
:param maxmatch: Boolean flag indicating to use NUCmer's -maxmatch option
:param jobprefix:
Loop over all FASTA files, generating Jobs describing NUCmer command lines
for each pairwise comparison.
"""
ncmds, fcmds = generate_nucmer_commands(
filenames, outdir, nucmer_exe, filter_exe, maxmatch
)
joblist = []
for idx, ncmd in enumerate(ncmds):
njob = pyani_jobs.Job(f"{jobprefix}_{idx:06d}-n", ncmd)
fjob = pyani_jobs.Job(f"{jobprefix}_{idx:06d}-f", fcmds[idx])
fjob.add_dependency(njob)
joblist.append(fjob)
return joblist
def build_db_jobs(infiles: List[Path], blastcmds: BLASTcmds) -> Dict:
"""Return dictionary of db-building commands, keyed by dbname.
:param infiles:
:param blastcmds:
"""
dbjobdict = {} # Dict of database construction jobs, keyed by filename
# Create dictionary of database building jobs, keyed by db name
# defining jobnum for later use as last job index used
for idx, fname in enumerate(infiles):
dbjobdict[blastcmds.get_db_name(fname)] = pyani_jobs.Job(
f"{blastcmds.prefix}_db_{idx:06}", blastcmds.build_db_cmd(fname)
)
return dbjobdict
logger.debug("Expected output file for db: %s", outfname)
# If we're in recovery mode, we don't want to repeat a computational
# comparison that already exists, so we check whether the ultimate
# output is in the set of existing files and, if not, we add the jobs
# TODO: something faster than a list search (dict or set?)
# The comparisons collections always gets updated, so that results are
# added to the database whether they come from recovery mode or are run
# in this call of the script.
if args.recovery and outfname.name in existingfiles:
logger.debug("Recovering output from %s, not building job", outfname)
else:
logger.debug("Building job")
# Build jobs
njob = pyani_jobs.Job("%s_%06d-n" % (args.jobprefix, idx), ncmd)
fjob = pyani_jobs.Job("%s_%06d-f" % (args.jobprefix, idx), dcmd)
fjob.add_dependency(njob)
joblist.append(ComparisonJob(query, subject, dcmd, ncmd, outfname, fjob))
return joblist
update_comparison_matrices,
)
# Convenience struct describing a pairwise comparison job for the SQLAlchemy
# implementation
class ComparisonJob(NamedTuple):
"""Pairwise comparison job for the SQLAlchemy implementation."""
query: str
subject: str
filtercmd: str
nucmercmd: str
outfile: Path
job: pyani_jobs.Job
# Convenience struct describing an analysis run
class RunData(NamedTuple):
"""Convenience struct describing an analysis run."""
method: str
name: str
date: datetime.datetime
cmdline: str
class ComparisonResult(NamedTuple):
"""Convenience struct for a single nucmer comparison result."""
:param outdir: str, path to output directory
:param nucmer_exe: str, location of the nucmer binary
:param filter_exe:
:param maxmatch: Boolean flag indicating to use NUCmer's -maxmatch option
:param jobprefix:
Loop over all FASTA files, generating Jobs describing NUCmer command lines
for each pairwise comparison.
"""
ncmds, fcmds = generate_nucmer_commands(
filenames, outdir, nucmer_exe, filter_exe, maxmatch
)
joblist = []
for idx, ncmd in enumerate(ncmds):
njob = pyani_jobs.Job(f"{jobprefix}_{idx:06d}-n", ncmd)
fjob = pyani_jobs.Job(f"{jobprefix}_{idx:06d}-f", fcmds[idx])
fjob.add_dependency(njob)
joblist.append(fjob)
return joblist
# Get dictionary of database-building jobs
dbjobdict = build_db_jobs(infiles, blastcmds)
# Create list of BLAST executable jobs, with dependencies
jobnum = len(dbjobdict)
for idx, fname1 in enumerate(fragfiles[:-1]):
for fname2 in fragfiles[idx + 1 :]:
jobnum += 1
jobs = [
pyani_jobs.Job(
f"{blastcmds.prefix}_exe_{jobnum:06d}_a",
blastcmds.build_blast_cmd(
fname1, fname2.parent / fname2.name.replace("-fragments", "")
),
),
pyani_jobs.Job(
f"{blastcmds.prefix}_exe_{jobnum:06d}_b",
blastcmds.build_blast_cmd(
fname2, fname1.parent / fname1.name.replace("-fragments", "")
),
),
]
jobs[0].add_dependency(
dbjobdict[fname1.parent / fname1.name.replace("-fragments", "")]
)
jobs[1].add_dependency(
dbjobdict[fname2.parent / fname2.name.replace("-fragments", "")]
)
joblist.extend(jobs)
# Return the dependency graph
return joblist