Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_anno_read():
"""Test reading an NCBI gene2go annotation file."""
fin_anno = os.path.join(REPO, 'gene2go')
_dnld_anno(fin_anno)
#godag = get_godag(os.path.join(REPO, 'go-basic.obo'), loading_bar=None)
print('\nTEST STORING ONLY ONE SPECIES')
obj = Gene2GoReader(fin_anno)
assert len(obj.taxid2asscs) == 1
obj.prt_summary_anno2ev()
print('\nTEST STORING ALL SPECIES')
obj = Gene2GoReader(fin_anno, taxids=True)
assert len(obj.taxid2asscs) > 1, '**EXPECTED MORE: len(taxid2asscs) == {N}'.format(
N=len(obj.taxid2asscs))
obj.prt_summary_anno2ev()
print('\nTEST GETTING ASSOCIATIONS FOR ONE SPECIES')
print("\nTEST read_ncbi_gene2go_old: [9606]")
old_g2go_hsa = read_ncbi_gene2go_old(fin_anno, [9606])
## new_g2go_hsa = read_ncbi_gene2go(fin_anno, [9606])
new_g2go_hsa = obj.get_id2gos_nss(taxids=[9606])
assert old_g2go_hsa == new_g2go_hsa, \
'OLD({O}) != NEW({N})'.format(O=len(old_g2go_hsa), N=len(new_g2go_hsa))
print("\nTEST read_ncbi_gene2go_old: 9606")
## assert old_g2go_hsa == read_ncbi_gene2go(fin_anno, 9606)
assert old_g2go_hsa == obj.get_id2gos_nss(taxid=9606)
print('\nTEST GETTING REVERSE ASSOCIATIONS: GO2GENES')
def test_anno_read():
"""Test reading an NCBI gene2go annotation file."""
fin_anno = os.path.join(REPO, 'gene2go')
_dnld_anno(fin_anno)
#godag = get_godag(os.path.join(REPO, 'go-basic.obo'), loading_bar=None)
print('\nTEST STORING ONLY ONE SPECIES')
obj = Gene2GoReader(fin_anno)
assert len(obj.taxid2asscs) == 1
obj.prt_summary_anno2ev()
print('\nTEST STORING ALL SPECIES')
obj = Gene2GoReader(fin_anno, taxids=True)
assert len(obj.taxid2asscs) > 1, '**EXPECTED MORE: len(taxid2asscs) == {N}'.format(
N=len(obj.taxid2asscs))
obj.prt_summary_anno2ev()
print('\nTEST GETTING ASSOCIATIONS FOR ONE SPECIES')
print("\nTEST read_ncbi_gene2go_old: [9606]")
old_g2go_hsa = read_ncbi_gene2go_old(fin_anno, [9606])
## new_g2go_hsa = read_ncbi_gene2go(fin_anno, [9606])
new_g2go_hsa = obj.get_id2gos_nss(taxids=[9606])
assert old_g2go_hsa == new_g2go_hsa, \
'OLD({O}) != NEW({N})'.format(O=len(old_g2go_hsa), N=len(new_g2go_hsa))
# Get http://geneontology.org/ontology/go-basic.obo
download_go_basic_obo()
# 1b. Download Associations, if necessary
# Get ftp://ftp.ncbi.nlm.nih.gov/gene/DATA/gene2go.gz
fin_gene2go = download_ncbi_associations()
# 2. Load Ontologies, Associations and Background gene set
# 2a. Load Ontologies
godag = GODag("go-basic.obo")
# 2b. Load Associations for all species
# Read NCBI's gene2go. Store annotations in a list of namedtuples
objanno_all = Gene2GoReader(fin_gene2go, godag=godag, taxids=True)
objanno_mmu = Gene2GoReader(fin_gene2go, godag=godag, taxids=[10090])
objanno_mmuhsa = Gene2GoReader(fin_gene2go, godag=godag, taxids=[10090, 9606])
# Get associations
# pylint: disable=bad-whitespace
ns2assoc_all_mmu = _run_get_ns2assc(10090, objanno_all)
ns2assoc_mmu_mmu = _run_get_ns2assc(10090, objanno_mmu)
ns2assoc_mmuhsa_all = _run_get_ns2assc(True, objanno_mmuhsa)
ns2assoc_mmuhsa_mmu = _run_get_ns2assc(10090, objanno_mmuhsa)
# Check results
for nspc in ['BP', 'MF', 'CC']:
assert ns2assoc_mmu_mmu[nspc] == ns2assoc_all_mmu[nspc]
assert ns2assoc_mmu_mmu[nspc] == ns2assoc_mmuhsa_mmu[nspc]
_chk_mmuhsa_all(objanno_mmuhsa, objanno_all, ns2assoc_mmuhsa_all)
def get_objanno(fin_anno, anno_type=None, **kws):
"""Read annotations in GAF, GPAD, Entrez gene2go, or text format."""
# kws get_objanno: taxids hdr_only prt allow_missing_symbol
anno_type = get_anno_desc(fin_anno, anno_type)
if anno_type is not None:
if anno_type == 'gene2go':
# kws: taxid taxids
kws_ncbi = {k:kws[k] for k in Gene2GoReader.exp_kws.intersection(kws.keys())}
return Gene2GoReader(fin_anno, **kws_ncbi)
if anno_type == 'gaf':
kws_gaf = {k:kws[k] for k in GafReader.exp_kws.intersection(kws.keys())}
return GafReader(fin_anno, **kws_gaf)
if anno_type == 'gpad':
kws_gpad = {k:kws[k] for k in GpadReader.exp_kws.intersection(kws.keys())}
return GpadReader(fin_anno, **kws_gpad)
if anno_type == 'id2gos':
kws_id2go = {k:kws[k] for k in IdToGosReader.exp_kws.intersection(kws.keys())}
return IdToGosReader(fin_anno, **kws_id2go)
raise RuntimeError('UNEXPECTED ANNOTATION FILE FORMAT: {F} {D}'.format(
F=fin_anno, D=anno_type))
def read_ncbi_gene2go(fin_gene2go, taxids=None, namespace='BP', **kws):
"""Read NCBI's gene2go. Return gene2go data for user-specified taxids."""
print('DEPRECATED read_ncbi_gene2go: USE Gene2GoReader FROM goatools.anno.genetogo_reader')
# pylint: disable=protected-access
frm = sys._getframe().f_back.f_code
print('DEPRECATED read_ncbi_gene2go CALLED FROM: {PY} BY {FNC}'.format(
PY=frm.co_filename, FNC=frm.co_name))
obj = Gene2GoReader(fin_gene2go, taxids=taxids)
# By default, return id2gos. User can cause go2geneids to be returned by:
# >>> read_ncbi_gene2go(..., go2geneids=True
if 'taxid2asscs' not in kws:
if len(obj.taxid2asscs) == 1:
taxid = next(iter(obj.taxid2asscs))
kws_ncbi = {k:v for k, v in kws.items() if k in AnnoOptions.keys_exp}
kws_ncbi['taxid'] = taxid
return obj.get_id2gos(namespace, **kws_ncbi)
# Optional detailed associations split by taxid and having both ID2GOs & GO2IDs
# e.g., taxid2asscs = defaultdict(lambda: defaultdict(lambda: defaultdict(set))
t2asscs_ret = obj.get_taxid2asscs(taxids, **kws)
t2asscs_usr = kws.get('taxid2asscs', defaultdict(lambda: defaultdict(lambda: defaultdict(set))))
if 'taxid2asscs' in kws:
obj.fill_taxid2asscs(t2asscs_usr, t2asscs_ret)
return obj.get_id2gos_all(t2asscs_ret)
def __init__(self, filename=None, **kws):
# kws: taxids or taxid
super(Gene2GoReader, self).__init__('gene2go', filename, **kws)
# Each taxid has a list of namedtuples - one for each line in the annotations
self.taxid2asscs = self._init_taxid2asscs()