Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_dependency_graph_run(self):
"""Test that module runs dependency graph."""
fragresult = anib.fragment_fasta_files(self.infiles, self.outdir, self.fraglen)
blastcmds = anib.make_blastcmd_builder("ANIb", self.outdir)
jobgraph = anib.make_job_graph(self.infiles, fragresult[0], blastcmds)
result = run_multiprocessing.run_dependency_graph(jobgraph)
self.assertEqual(0, result)
def test_blastall_single(path_fna_two, tmp_path):
"""Generate legacy BLASTN command-line."""
cmd = anib.construct_blastall_cmdline(path_fna_two[0], path_fna_two[1], tmp_path)
expected = (
f"blastall -p blastn -o {tmp_path / str(path_fna_two[0].stem + '_vs_' + path_fna_two[1].stem + '.blast_tab')} "
f"-i {path_fna_two[0]} "
f"-d {path_fna_two[1]} "
"-X 150 -q -1 -F F -e 1e-15 -b 1 -v 1 -m 8"
)
assert cmd == expected
database constructed from the whole genome input. The BLAST+ blastn tool
is then used to query each set of fragments against each BLAST+ database,
in turn.
For each query, the BLAST+ .tab output is parsed to obtain alignment length,
identity and similarity error count. Alignments below a threshold are not
included in the calculation (this introduces systematic bias with respect to
ANIm). The results are processed to calculate the ANI percentages, coverage,
and similarity error.
The calculated values are stored in the local SQLite3 database.
"""
logger.info("Running ANIm analysis") # announce that we're starting
# Get BLAST+ version - this will be used in the database entries
blastn_version = anib.get_version(args.blastn_exe)
logger.info(f"BLAST+ blastn version: {blastn_version}")
# Use provided name, or make new one for this analysis
start_time = datetime.datetime.now()
name = args.name or "_".join(["ANIb", start_time.isoformat()])
logger.info(f"Analysis name: {name}")
# Connect to existing database (which may be "clean" or have old analyses)
logger.info(f"Connecting to database {args.dbpath}")
try:
session = get_session(args.dbpath)
except Exception:
logger.error(
f"Could not connect to database {args.dbpath} (exiting)", exc_info=True
)
raise SystemExit(1)
)
existingfiles = collect_existing_output(args.outdir, "blastn", args)
logger.info(
f"\tIdentified {len(existingfiles)} existing output files for reuse"
)
else:
existingfiles = None
logger.info(f"\tIdentified no existing output files")
# Split the input genome files into contiguous fragments of the specified size,
# as described in Goris et al. We create a new directory to hold sequence
# fragments, away from the main genomes
logger.info(f"Splitting input genome files into {args.fragsize}nt fragments...")
fragdir = Path(args.outdir) / "fragments"
os.makedirs(fragdir, exist_ok=True)
fragfiles, fraglens = anib.fragment_fasta_files(
[Path(str(_.path)) for _ in genomes],
Path(args.outdir) / "fragments",
args.fragsize,
)
logger.info(f"...wrote {len(fragfiles)} fragment files to {fragdir}")
# Create list of BLASTN jobs for each comparison still to be performed
logger.info("Creating blastn jobs for ANIb...")
joblist = generate_joblist(
comparisons_to_run, existingfiles, fragfiles, fraglens, args, logger
)
logger.info(f"...created {len(joblist)} blastn jobs")
raise NotImplementedError
def make_sequence_fragments(
args: Namespace, logger: Logger, infiles: List[Path], blastdir: Path
) -> Tuple[List, Dict]:
"""Return tuple of fragment files, and fragment sizes.
:param args: Namespace of command-line arguments
:param logger: logging object
:param infiles: iterable of sequence files to fragment
:param blastdir: path of directory to place BLASTN databases
of fragments
Splits input FASTA sequence files into the fragments (a requirement
for ANIb methods), and writes BLAST databases of these fragments,
and fragment lengths of sequences, to local files.
"""
fragfiles, fraglengths = anib.fragment_fasta_files(infiles, blastdir, args.fragsize)
# Export fragment lengths as JSON, in case we re-run with --skip_blastn
fragpath = blastdir / "fraglengths.json"
logger.info(f"Writing cache of fragment lengths to {fragpath}")
with open(fragpath, "w") as ofh:
json.dump(fraglengths, ofh)
return fragfiles, fraglengths