Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
c = (1.0, 0.65, 0.0) # orange
else:
c = (1.0, 0.0, 0.0)
ax.plot((boundary, boundary),
(i, i + 0.5), c=c, lw=2, zorder=2)
percentiles[i] = [p10, p50, p90]
# create scatter plot and results table
x = []
y = []
c = []
labels = []
rank_labels = []
for i, rank in enumerate(sorted(medians_for_taxa.keys())):
rank_label = Taxonomy.rank_labels[rank]
rank_labels.append(rank_label + ' (%d)' %
len(medians_for_taxa[rank]))
mono = []
poly = []
no_inference = []
for clade_label, dists in medians_for_taxa[rank].iteritems():
md = np_median(dists)
x.append(md)
y.append(i)
labels.append(clade_label)
if self._is_integer(clade_label.split('^')[-1]):
# taxa with a numerical suffix after a caret indicate
# polyphyletic groups when decorated with tax2tree
c.append((1.0, 0.0, 0.0))
# get median RED values for domain
if ingroup_domain == 'd__Bacteria':
median_reds = RED_DIST_BAC_DICT
elif ingroup_domain == 'd__Archaea':
median_reds = RED_DIST_ARC_DICT
else:
raise GTDBTkExit(f'Unrecognized GTDB domain: {ingroup_domain}.')
# report median values
domain = ingroup_domain.replace('d__', '')
self.logger.info('Median RED values for {}:'.format(domain))
for idx, rank_prefix in enumerate(Taxonomy.rank_prefixes):
if idx != Taxonomy.DOMAIN_IDX and idx != Taxonomy.SPECIES_IDX:
self.logger.info(' {}\t{:.3f}'.format(
Taxonomy.rank_labels[idx].capitalize(),
median_reds[rank_prefix]))
return median_reds
Parameters
----------
tax_str : str
Greengenes-style taxonomy string.
Returns
-------
dict : d[rank_label] -> taxon
Taxon at each taxonomic rank.
"""
taxa = self.taxa(tax_str)
d = {}
for rank, taxon in enumerate(taxa):
d[Taxonomy.rank_labels[rank]] = taxon
----------
rank_label : str (e.g., class or order)
Rank of interest
taxonomy : d[unique_id] -> [d__; ...; s__]
Taxonomy strings indexed by unique ids.
Returns
-------
dict : d[taxon] -> set of extant taxa
Extant taxa for named groups at the specified rank.
"""
assert (rank_label in Taxonomy.rank_labels)
d = defaultdict(set)
rank_index = Taxonomy.rank_labels.index(rank_label)
for taxon_id, taxa in taxonomy.items():
if taxa[rank_index] != Taxonomy.rank_prefixes[rank_index]:
d[taxa[rank_index]].add(taxon_id)
return d
def extant_taxa(self, taxonomy):
"""Get extant taxa for all taxa.
Parameters
----------
taxonomy : d[unique_id] -> [d__; ...; s__]
Taxonomy strings indexed by unique ids.
Returns
-------
dict : d[taxon] -> set of extant taxa
Extant taxa for named groups at the specified rank.
"""
extant_taxa = {}
for rank_label in Taxonomy.rank_labels:
extant_taxa.update(self.extant_taxa_for_rank(rank_label, taxonomy))
return extant_taxa
Dendropy Tree.
taxonomy : d[extent_taxon_id] -> taxa list
Taxon labels for extant taxa.
Returns
-------
d[taxon] -> [(Node, F-measure, precision, recall_, ...]
Node(s) with highest F-measure for each taxon.
"""
# get named lineages/taxa at each taxonomic rank
taxa_at_rank = Taxonomy().named_lineages_at_rank(taxonomy)
# get extant taxa for each taxon label
extent_taxa_with_label = {}
for i, rank in enumerate(Taxonomy.rank_labels):
extent_taxa_with_label[i] = Taxonomy().extant_taxa_for_rank(rank, taxonomy)
# get parent taxon for each taxon:
taxon_parents = Taxonomy().parents(taxonomy)
# get number of leaves and taxon in each lineage
self.logger.info('Calculating taxa within each lineage.')
for node in tree.preorder_node_iter():
num_leaves = 0
taxa_count = defaultdict(lambda: defaultdict(int))
for leaf in node.leaf_iter():
num_leaves += 1
for rank_index, taxon in enumerate(taxonomy[leaf.taxon.label]):
if taxon != Taxonomy.rank_prefixes[rank_index]:
taxa_count[rank_index][taxon] += 1
node.num_leaves = num_leaves
node.taxa_count = taxa_count
taxa_in_tree = defaultdict(int)
for leaf in tree.leaf_node_iter():
for taxon in taxonomy[leaf.taxon.label]:
taxa_in_tree[taxon] += 1
# find node with best F-measure for each taxon
fmeasure_for_taxa = {}
for rank_index in range(0, len(Taxonomy.rank_labels)):
# if rank_index == 6: #*** skip species
# continue
self.logger.info('Processing {:,} taxa at {} rank.'.format(
len(taxa_at_rank[rank_index]),
Taxonomy.rank_labels[rank_index].capitalize()))
for taxon in taxa_at_rank[rank_index]:
if rank_index == 0:
# processing taxa at the domain is a special case
taxon_parent_node = tree.seed_node
else:
# find first named parent
# e.g., Cyanobacteria for Synechococcales in d__Bacteria;p__Cyanobacteria;c__;o__Synechococcales
parent_taxon = 'x__'
parent_index = rank_index - 1
while len(parent_taxon) == 3 and parent_index != -1:
parent_taxon = taxon_parents[taxon][parent_index]
parent_index -= 1
if parent_taxon in fmeasure_for_taxa:
# only need to process the lineage below the parent node,