Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
the corresponding genome id."""
dict_compare, dict_paths = dict(), dict()
for qry_node, qry_dict in fastani_verification.items():
user_label = qry_node.taxon.label
dict_paths[user_label] = genomes[user_label]
dict_compare[user_label] = set()
for node in qry_dict.get('potential_g'):
leafnode = node[0]
shortleaf = leafnode.taxon.label
if leafnode.taxon.label.startswith('GB_') or leafnode.taxon.label.startswith('RS_'):
shortleaf = leafnode.taxon.label[3:]
ref_path = os.path.join(
Config.FASTANI_GENOMES, shortleaf + Config.FASTANI_GENOMES_EXT)
if not os.path.isfile(ref_path):
raise GTDBTkExit(f'Reference genome missing from FastANI database: {ref_path}')
dict_compare[user_label].add(shortleaf)
dict_paths[shortleaf] = ref_path
return dict_compare, dict_paths
"""Find node of ingroup taxon in tree."""
ingroup_node = None
for node in tree.postorder_node_iter():
support, taxon, auxiliary_info = parse_label(node.label)
if taxon:
taxa = [t.strip() for t in taxon.split(';')]
if ingroup_taxon in taxa:
if ingroup_node is not None:
raise GTDBTkExit(f'Ingroup taxon {ingroup_taxon} '
f'identified multiple times.')
ingroup_node = node
if ingroup_node is None:
raise GTDBTkExit(f'Ingroup taxon {ingroup_taxon} not found in tree.')
return ingroup_node
def _find_ingroup_taxon(self, ingroup_taxon, tree):
"""Find node of ingroup taxon in tree."""
ingroup_node = None
for node in tree.postorder_node_iter():
support, taxon, auxiliary_info = parse_label(node.label)
if taxon:
taxa = [t.strip() for t in taxon.split(';')]
if ingroup_taxon in taxa:
if ingroup_node is not None:
raise GTDBTkExit(f'Ingroup taxon {ingroup_taxon} '
f'identified multiple times.')
ingroup_node = node
if ingroup_node is None:
raise GTDBTkExit(f'Ingroup taxon {ingroup_taxon} not found in tree.')
return ingroup_node
def _calculate(self):
self.logger.info('Calculating Mash distances.')
args = ['mash', 'dist', '-p', self.cpus, '-d', self.max_d, '-v',
self.mash_v, self.ref_sketch.path, self.qry_sketch.path]
args = list(map(str, args))
with open(self.path, 'w') as f_out:
proc = subprocess.Popen(args, stdout=f_out,
stderr=subprocess.PIPE, encoding='utf-8')
_, stderr = proc.communicate()
if proc.returncode != 0:
raise GTDBTkExit(f'Error running Mash dist: {proc.stderr.read()}')
try:
workerProc = [mp.Process(target=self._workerThread, args=(
workerQueue, writerQueue, n_skipped)) for _ in range(self.threads)]
writeProc = mp.Process(target=self._writerThread, args=(
len(gene_files), writerQueue))
writeProc.start()
for p in workerProc:
p.start()
for p in workerProc:
p.join()
if p.exitcode != 0:
raise GTDBTkExit('An error was encountered while running hmmsearch.')
writerQueue.put(None)
writeProc.join()
except Exception:
for p in workerProc:
p.terminate()
writeProc.terminate()
raise
if n_skipped.value > 0:
genome_s = 'genome' if n_skipped.value == 1 else 'genomes'
self.logger.warning(f'Pfam skipped {n_skipped.value} {genome_s} '
f'due to pre-existing data, see warnings.log')
def _get_ingroup_domain(self, ingroup_taxon) -> str:
"""Get domain on ingroup taxon."""
# read GTDB taxonomy in order to establish domain on ingroup taxon
gtdb_taxonomy = Taxonomy().read(TAXONOMY_FILE)
ingroup_domain = None
for taxa in gtdb_taxonomy.values():
if ingroup_taxon in taxa:
ingroup_domain = taxa[Taxonomy.DOMAIN_IDX]
if ingroup_domain is None:
raise GTDBTkExit(f'Ingroup taxon {ingroup_taxon} was not found in '
f'the GTDB taxonomy.')
return ingroup_domain
def run(self, gtdbtk_output_dir, ar122_metadata_file, bac120_metadata_file,
output_file, gtdbtk_prefix):
"""Translate GTDB to NCBI classification via majority vote."""
# Set the output directories
if not (ar122_metadata_file or bac120_metadata_file):
raise GTDBTkExit('You must specify at least one of --ar122_metadata_file or --bac120_metadata_file')
ar_summary = os.path.join(gtdbtk_output_dir,
PATH_AR122_SUMMARY_OUT.format(prefix=gtdbtk_prefix)) \
if ar122_metadata_file else None
ar_tree = os.path.join(gtdbtk_output_dir,
PATH_AR122_TREE_FILE.format(prefix=gtdbtk_prefix)) \
if ar122_metadata_file else None
bac_summary = os.path.join(gtdbtk_output_dir,
PATH_BAC120_SUMMARY_OUT.format(prefix=gtdbtk_prefix)) \
if bac120_metadata_file else None
bac_tree = os.path.join(gtdbtk_output_dir,
PATH_BAC120_TREE_FILE.format(prefix=gtdbtk_prefix)) \
if bac120_metadata_file else None
# Create the output file directory.
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.isdir(output_dir):
if r is not None:
args.extend(['-r', r])
if ql is not None:
args.extend(['--ql', ql])
if rl is not None:
args.extend(['--rl', rl])
args.extend(['-o', output])
self.logger.debug(' '.join(args))
proc = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, encoding='utf-8')
stdout, stderr = proc.communicate()
if proc.returncode != 0:
self.logger.error('STDOUT:\n' + stdout)
self.logger.error('STDERR:\n' + stderr)
raise GTDBTkExit('FastANI returned a non-zero exit code.')
# Parse the output
return self.parse_output_file(output)
def _load_metadata(self):
"""Loads the metadata from an existing Mash sketch file."""
args = ['mash', 'info', '-t', self.path]
proc = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, encoding='utf-8')
stdout, stderr = proc.communicate()
if proc.returncode != 0:
raise GTDBTkExit(f'Error reading Mash sketch file {self.path}:\n{stderr}')
for hashes, length, path in re.findall(r'(\d+)\t(\d+)\t(.+)\t.+\n', stdout):
self.data[path] = (int(hashes), int(length))