Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
We expect ANIb results to be quite different, as the BLASTN
algorithm changed substantially between BLAST and BLAST+ (the
megaBLAST algorithm is now the default for BLASTN)
"""
# Get lengths of input genomes
orglengths = pyani_files.get_sequence_lengths(paths_concordance_fna)
# Build and run BLAST jobs
fragfiles, fraglengths = anib.fragment_fasta_files(
paths_concordance_fna, tmp_path, fragment_length
)
jobgraph = anib.make_job_graph(
paths_concordance_fna, fragfiles, anib.make_blastcmd_builder("ANIb", tmp_path)
)
assert 0 == run_mp.run_dependency_graph(jobgraph) # Jobs must run correctly
# Process BLAST output
result_pid = anib.process_blast(
tmp_path, orglengths, fraglengths, mode="ANIb"
).percentage_identity
# Compare JSpecies output to results. We do this in two blocks,
# masked according to whether the expected result is greater than
# a threshold separating "low" from "high" identity comparisons.
result_pid = result_pid.sort_index(axis=0).sort_index(axis=1) * 100.0
lo_result = result_pid.mask(result_pid >= threshold_anib_lo_hi).fillna(0).values
hi_result = result_pid.mask(result_pid < threshold_anib_lo_hi).fillna(0).values
tgt_pid = parse_jspecies(path_concordance_jspecies)["ANIb"]
lo_target = tgt_pid.mask(tgt_pid >= threshold_anib_lo_hi).fillna(0).values
hi_target = tgt_pid.mask(tgt_pid < threshold_anib_lo_hi).fillna(0).values
tmp_path,
):
"""Check ANIblastall results are concordant with JSpecies."""
# Get lengths of input genomes
orglengths = pyani_files.get_sequence_lengths(paths_concordance_fna)
# Perform ANIblastall on the input directory contents
fragfiles, fraglengths = anib.fragment_fasta_files(
paths_concordance_fna, tmp_path, fragment_length
)
jobgraph = anib.make_job_graph(
paths_concordance_fna,
fragfiles,
anib.make_blastcmd_builder("ANIblastall", tmp_path),
)
assert 0 == run_mp.run_dependency_graph(jobgraph) # Jobs must run correctly
# Process BLAST output
result_pid = anib.process_blast(
tmp_path, orglengths, fraglengths, mode="ANIblastall"
).percentage_identity
# Compare JSpecies output to results
result_pid = (result_pid.sort_index(axis=0).sort_index(axis=1) * 100.0).values
tgt_pid = parse_jspecies(path_concordance_jspecies)["ANIb"].values
assert result_pid - tgt_pid == pytest.approx(0, abs=tolerance_anib_hi)
def run_anim_jobs(
joblist: List[ComparisonJob], args: Namespace, logger: Logger
) -> None:
"""Pass ANIm nucmer jobs to the scheduler.
:param joblist: list of ComparisonJob namedtuples
:param args: command-line arguments for the run
:param logger: logging output
"""
if args.scheduler == "multiprocessing":
logger.info("Running jobs with multiprocessing")
if not args.workers:
logger.info("(using maximum number of worker threads)")
else:
logger.info("(using %d worker threads, if available)", args.workers)
cumval = run_mp.run_dependency_graph(
[_.job for _ in joblist], workers=args.workers, logger=logger
)
if cumval > 0:
logger.error(
"At least one NUCmer comparison failed. "
+ "Please investigate (exiting)"
)
raise PyaniException("Multiprocessing run failed in ANIm")
logger.info("Multiprocessing run completed without error")
else:
logger.info("Running jobs with SGE")
logger.info("Setting jobarray group size to %d", args.sgegroupsize)
run_sge.run_dependency_graph(
[_.job for _ in joblist],
logger=logger,
jgprefix=args.jobprefix,
each input file
"""
if not args.skip_blastn:
logger.info("Fragmenting input files, and writing to %s", args.outdirname)
fragfiles, fraglengths = make_sequence_fragments(
args, logger, infiles, blastdir
)
# Run BLAST database-building and executables from a jobgraph
logger.info("Creating job dependency graph")
jobgraph = anib.make_job_graph(
infiles, fragfiles, anib.make_blastcmd_builder(args.method, blastdir)
)
if args.scheduler == "multiprocessing":
logger.info("Running dependency graph with multiprocessing")
cumval = run_mp.run_dependency_graph(jobgraph, logger=logger)
if cumval > 0:
logger.warning(
f"At least one BLAST run failed. {args.method} may fail. Please investigate."
)
else:
logger.info("All multiprocessing jobs complete.")
elif args.scheduler == "SGE":
logger.info("Running dependency graph with SGE")
run_sge.run_dependency_graph(jobgraph, logger=logger)
else:
logger.error(f"Scheduler {args.scheduler} not recognised (exiting)")
raise SystemError(1)
else:
logger.warning("Skipping BLASTN runs (as instructed)!")
# Import fragment lengths from JSON
if args.method == "ANIblastall":
if not args.skip_nucmer:
joblist = anim.generate_nucmer_jobs(
infiles,
args.outdirname,
nucmer_exe=args.nucmer_exe,
filter_exe=args.filter_exe,
maxmatch=args.maxmatch,
jobprefix=args.jobprefix,
)
if args.scheduler == "multiprocessing":
logger.info("Running jobs with multiprocessing")
if args.workers is None:
logger.info("(using maximum number of available worker threads)")
else:
logger.info("(using %d worker threads, if available)", args.workers)
cumval = run_mp.run_dependency_graph(
joblist, workers=args.workers, logger=logger
)
logger.info("Cumulative return value: %d", cumval)
if cumval > 0:
logger.warning("At least one NUCmer comparison failed. ANIm may fail.")
else:
logger.info("All multiprocessing jobs complete.")
else:
logger.info("Running jobs with SGE")
logger.info("Jobarray group size set to %d", args.sgegroupsize)
run_sge.run_dependency_graph(
joblist,
logger=logger,
jgprefix=args.jobprefix,
sgegroupsize=args.sgegroupsize,
sgeargs=args.sgeargs,
# Build jobs
njob = pyani_jobs.Job("%s_%06d-n" % (jobprefix, idx), ncmd)
fjob = pyani_jobs.Job("%s_%06d-f" % (jobprefix, idx), dcmd)
fjob.add_dependency(njob)
joblist.append(fjob)
# Pass commands to the appropriate scheduler
logger.info("Passing %d jobs to scheduler", len(joblist))
if args.scheduler == 'multiprocessing':
logger.info("Running jobs with multiprocessing")
if not args.workers:
logger.info("(using maximum number of worker threads)")
else:
logger.info("(using %d worker threads, if available)",
args.workers)
cumval = run_mp.run_dependency_graph(joblist,
workers=args.workers,
logger=logger)
if 0 < cumval:
logger.error("At least one NUCmer comparison failed. " +
"Please investigate (exiting)")
raise pyani_tools.PyaniException("Multiprocessing run " +
"failed in ANIm")
else:
logger.info("Multiprocessing run completed without error")
else:
logger.info("Running jobs with SGE")
logger.info("Setting jobarray group size to %d", args.sgegroupsize)
run_sge.run_dependency_graph(joblist, logger=logger,
jgprefix=args.jobprefix,
sgegroupsize=args.sgegroupsize,
sgeargs=args.sgeargs)