Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_anim_concordance(
paths_concordance_fna, path_concordance_jspecies, tolerance_anim, tmp_path
):
"""Check ANIm results are concordant with JSpecies."""
# Perform ANIm on the input directory contents
# We have to separate nucmer/delta-filter command generation
# because Travis-CI doesn't play nicely with changes we made
# for local SGE/OGE integration.
# This might be avoidable with a scheduler flag passed to
# jobgroup generation in the anim.py module. That's a TODO.
ncmds, fcmds = anim.generate_nucmer_commands(paths_concordance_fna, tmp_path)
(tmp_path / "nucmer_output").mkdir(exist_ok=True, parents=True)
run_mp.multiprocessing_run(ncmds)
# delta-filter commands need to be treated with care for
# Travis-CI. Our cluster won't take redirection or semicolon
# separation in individual commands, but the wrapper we wrote
# for this (delta_filter_wrapper.py) can't be called under
# Travis-CI. So we must deconstruct the commands below
dfcmds = [
" > ".join([" ".join(fcmd.split()[1:-1]), fcmd.split()[-1]]) for fcmd in fcmds
]
run_mp.multiprocessing_run(dfcmds)
orglengths = pyani_files.get_sequence_lengths(paths_concordance_fna)
results = anim.process_deltadir(tmp_path / "nucmer_output", orglengths)
(tmp_path / "nucmer_output").mkdir(exist_ok=True, parents=True)
run_mp.multiprocessing_run(ncmds)
# delta-filter commands need to be treated with care for
# Travis-CI. Our cluster won't take redirection or semicolon
# separation in individual commands, but the wrapper we wrote
# for this (delta_filter_wrapper.py) can't be called under
# Travis-CI. So we must deconstruct the commands below
dfcmds = [
" > ".join([" ".join(fcmd.split()[1:-1]), fcmd.split()[-1]]) for fcmd in fcmds
]
run_mp.multiprocessing_run(dfcmds)
orglengths = pyani_files.get_sequence_lengths(paths_concordance_fna)
results = anim.process_deltadir(tmp_path / "nucmer_output", orglengths)
result_pid = results.percentage_identity
result_pid.to_csv(tmp_path / "pyani_anim.tab", sep="\t")
# Compare JSpecies output to results
result_pid = (result_pid.sort_index(axis=0).sort_index(axis=1) * 100.0).values
tgt_pid = parse_jspecies(path_concordance_jspecies)["ANIm"].values
assert result_pid - tgt_pid == pytest.approx(0, abs=tolerance_anim)
def test_deltadir_parsing(delta_output_dir):
"""Process test directory of .delta files into ANIResults."""
seqfiles = pyani_files.get_fasta_files(delta_output_dir.seqdir)
orglengths = pyani_files.get_sequence_lengths(seqfiles)
result = anim.process_deltadir(delta_output_dir.deltadir, orglengths)
assert_frame_equal(
result.percentage_identity.sort_index(1).sort_index(),
delta_output_dir.deltaresult.sort_index(1).sort_index(),
)
def test_anim_collection(self):
"""Test generation of list of NUCmer comparison commands."""
files = [Path("file1"), Path("file2"), Path("file3"), Path("file4")]
cmds_nucmer, cmds_filter = anim.generate_nucmer_commands(files)
tgts_nucmer = [
"nucmer --mum -p nucmer_output/file1_vs_file2 file1 file2",
"nucmer --mum -p nucmer_output/file1_vs_file3 file1 file3",
"nucmer --mum -p nucmer_output/file1_vs_file4 file1 file4",
"nucmer --mum -p nucmer_output/file2_vs_file3 file2 file3",
"nucmer --mum -p nucmer_output/file2_vs_file4 file2 file4",
"nucmer --mum -p nucmer_output/file3_vs_file4 file3 file4",
]
tgts_filter = [
"delta_filter_wrapper.py delta-filter -1 nucmer_output/file1_vs_file2.delta nucmer_output/file1_vs_file2.filter",
"delta_filter_wrapper.py delta-filter -1 nucmer_output/file1_vs_file3.delta nucmer_output/file1_vs_file3.filter",
"delta_filter_wrapper.py delta-filter -1 nucmer_output/file1_vs_file4.delta nucmer_output/file1_vs_file4.filter",
"delta_filter_wrapper.py delta-filter -1 nucmer_output/file2_vs_file3.delta nucmer_output/file2_vs_file3.filter",
"delta_filter_wrapper.py delta-filter -1 nucmer_output/file2_vs_file4.delta nucmer_output/file2_vs_file4.filter",
"delta_filter_wrapper.py delta-filter -1 nucmer_output/file3_vs_file4.delta nucmer_output/file3_vs_file4.filter",
]
) -> None:
"""Update the Comparison table with the completed result set.
:param joblist: list of ComparisonJob namedtuples
:param run: Run ORM object for the current ANIm run
:param session: active pyanidb session via ORM
:param nucmer_version: version of nucmer used for the comparison
:param args: command-line arguments for this run
:param logger: logging output
The Comparison table stores individual comparison results, one per row.
"""
# Add individual results to Comparison table
for job in tqdm(joblist, disable=args.disable_tqdm):
logger.debug("\t%s vs %s", job.query.description, job.subject.description)
aln_length, sim_errs = anim.parse_delta(job.outfile)
qcov = aln_length / job.query.length
scov = aln_length / job.subject.length
try:
pid = 1 - sim_errs / aln_length
except ZeroDivisionError: # aln_length was zero (no alignment)
pid = 0
run.comparisons.append(
Comparison(
query=job.query,
subject=job.subject,
aln_length=aln_length,
sim_errs=sim_errs,
identity=pid,
cov_query=qcov,
cov_subject=scov,
program="nucmer",
genome_ids = pyani_db.get_genome_ids_by_run(args.dbpath, run_id)
logger.debug("Genome IDs for analysis with ID %s:\n\t%s",
run_id, genome_ids)
# Generate all pair combinations of genome IDs
logger.info(
"Compiling pairwise comparisons (this can take time for large datasets)")
comparison_ids = list(combinations(tqdm(genome_ids), 2))
logger.debug("Complete pairwise comparison list:\n\t%s", comparison_ids)
# Check for existing comparisons; if one has been done (for the same
# software package, version, and setting) we remove it from the list
# of comparisons to be performed, but we add a new entry to the
# runs_comparisons table.
# TODO: turn this into a generator or some such?
nucmer_version = anim.get_version(args.nucmer_exe)
# Existing entries for the comparison:run link table
new_link_ids = [(qid, sid) for (qid, sid) in comparison_ids if
pyani_db.get_comparison(args.dbpath, qid, sid, "nucmer",
nucmer_version,
maxmatch=args.maxmatch) is
not None]
logger.info("Existing comparisons to be associated with new run:\n\t%s",
new_link_ids)
if len(new_link_ids) > 0:
for (qid, sid) in tqdm(new_link_ids):
pyani_db.add_comparison_link(args.dbpath, run_id, qid, sid,
"nucmer", nucmer_version,
maxmatch=args.maxmatch)
# If we are in recovery mode, we are salvaging output from a previous
else:
logger.info("Multiprocessing run completed without error")
else:
logger.info("Running jobs with SGE")
logger.info("Setting jobarray group size to %d", args.sgegroupsize)
run_sge.run_dependency_graph(joblist, logger=logger,
jgprefix=args.jobprefix,
sgegroupsize=args.sgegroupsize,
sgeargs=args.sgeargs)
# Process output and add results to database
# We have to drop out of threading/multiprocessing to do this: Python's
# SQLite3 interface doesn't allow sharing connections and cursors
logger.info("Adding comparison results to database")
for comp in tqdm(comparisons):
aln_length, sim_errs = anim.parse_delta(comp.outfile)
qlen = pyani_db.get_genome_length(args.dbpath, comp.query_id)
slen = pyani_db.get_genome_length(args.dbpath, comp.subject_id)
qcov = aln_length / qlen
scov = aln_length / slen
pid = 1 - sim_errs / aln_length
comp_id = pyani_db.add_comparison(args.dbpath, comp.query_id,
comp.subject_id, aln_length,
sim_errs, pid, qcov, scov,
"nucmer", nucmer_version,
maxmatch=args.maxmatch)
link_id = pyani_db.add_comparison_link(args.dbpath, run_id,
comp.query_id,
comp.subject_id,
"nucmer", nucmer_version,
maxmatch=args.maxmatch)
logger.debug("Added ID %s vs %s, as comparison %s (link: %s)",
The NUCmer .delta file output is parsed to obtain an alignment length
and similarity error count for every unique region alignment between
the two organisms, as represented by the sequences in the FASTA files.
These are processed to give matrices of aligned sequence lengths,
average nucleotide identity (ANI) percentages, coverage (aligned
percentage of whole genome), and similarity error cound for each pairwise
comparison.
"""
logger.info("Running ANIm")
logger.info("Generating NUCmer command-lines")
deltadir = args.outdirname / ALIGNDIR["ANIm"]
logger.info("Writing nucmer output to %s", deltadir)
# Schedule NUCmer runs
if not args.skip_nucmer:
joblist = anim.generate_nucmer_jobs(
infiles,
args.outdirname,
nucmer_exe=args.nucmer_exe,
filter_exe=args.filter_exe,
maxmatch=args.maxmatch,
jobprefix=args.jobprefix,
)
if args.scheduler == "multiprocessing":
logger.info("Running jobs with multiprocessing")
if args.workers is None:
logger.info("(using maximum number of available worker threads)")
else:
logger.info("(using %d worker threads, if available)", args.workers)
cumval = run_mp.run_dependency_graph(
joblist, workers=args.workers, logger=logger
)