Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_key_function_by_fetch(self):
faidx = Faidx('data/genes.fasta', split_char='|', duplicate_action="drop")
expect = 'TTGAAGATTTTGCATGCAGCAGGTGCGCAAGGTGAAATGTTCACTGTTAAA'
result = faidx.fetch('KF435150.1',
100, 150)
assert str(result) == expect
def test_fetch_middle(self):
faidx = Faidx('data/genes.fasta')
expect = 'TTGAAGATTTTGCATGCAGCAGGTGCGCAAGGTGAAATGTTCACTGTTAAA'
result = faidx.fetch('gi|557361099|gb|KF435150.1|',
100, 150)
assert str(result) == expect
fasta = genes.readlines()
n_lines = sum(1 for line in fasta)
for n in range(n_lines):
with NamedTemporaryFile(mode='w') as lines:
for i, line in enumerate(fasta):
if i == n and line[0] != '>' and len(line) == 71:
line = line[:-3] + '\n'
full_line = True
elif i == n:
full_line = False
lines.write(line)
lines.flush()
name = lines.name
if full_line:
try:
Faidx(name)
indexed.append(True)
except FastaIndexingError:
indexed.append(False)
assert not any(indexed)
def test_reindex_on_modification(self):
""" This test ensures that the index is regenerated when the FASTA
modification time is newer than the index modification time.
mdshw5/pyfaidx#50 """
faidx = Faidx('data/genes.fasta')
index_mtime = getmtime(faidx.indexname)
faidx.close()
os.utime('data/genes.fasta', (index_mtime + 10, ) * 2)
time.sleep(2)
faidx = Faidx('data/genes.fasta')
assert getmtime(faidx.indexname) > index_mtime
def __init__(self):
self.fasta = os.path.join(path, 'data/genes.fasta')
self.faidx = Faidx(self.fasta, default_seq='N')
default='tumor_fa'
parser.add_argument('-o','--output',default=default,type=check_output_folder,metavar='DIR',
help='output directory [{}]'.format(default))
default=50
parser.add_argument('-w','--width',default=default,type=int,metavar='INT',
help='the line width of output fasta files [{}]'.format(default))
default=1
parser.add_argument('--cores',type=int,default=default,metavar='INT',
help='number of cores used to run the program [{}]'.format(default))
args=parser.parse_args()
os.mkdir(args.output,mode=0o755)
normal_fa=args.normal.split(',')
for fa in normal_fa:
pyfaidx.Faidx(fa)
pool=multiprocessing.Pool(processes=args.cores)
results=[]
for node_chain in glob.glob(os.path.join(args.chain,'node*.chain')):
results.append(pool.apply_async(build_fasta,args=(args.output,node_chain,normal_fa,args.width)))
pool.close()
pool.join()
#handle exceptions if any
for result in results:
result.get()
t1 = time.time()
print ("Total time running {}: {} seconds".format
(prog, str(t1-t0)))
def pyfaidx_faidx(n):
print('timings for pyfaidx.Faidx')
ti = []
tf = []
for _ in range(n):
t = time.time()
f = pyfaidx.Faidx(fa_file.name)
ti.append(time.time() - t)
t = time.time()
read_faidx(f, headers)
tf.append(time.time() - t)
os.remove(index)
# profile memory usage and report timings
tracemalloc.start()
f = pyfaidx.Faidx(fa_file.name)
read_faidx(f, headers)
os.remove(index)
print(tracemalloc.get_traced_memory())
print(mean(ti))
print(mean(tf)/nreads/10*1000*1000)
tracemalloc.stop()
def pyfaidx_bgzf_faidx(n):
print('timings for pyfaidx.Faidx with bgzf compression')
ti = []
tf = []
for _ in range(n):
t = time.time()
f = pyfaidx.Faidx(fa_file.name + '.gz')
ti.append(time.time() - t)
t = time.time()
read_faidx(f, headers)
tf.append(time.time() - t)
os.remove(index)
# profile memory usage and report timings
tracemalloc.start()
f = pyfaidx.Faidx(fa_file.name + '.gz')
read_faidx(f, headers)
os.remove(index)
print(tracemalloc.get_traced_memory())
print(mean(ti))
print(mean(tf)/nreads/10*1000*1000)
tracemalloc.stop()
def genomesize(fasta=None):
'''
Extract genome size from .fa file.
'''
fa=pyfaidx.Faidx(fasta)
gsize=0
for chroms in fa.index.keys():
gsize+=fa.index[chroms].rlen
return gsize