Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_split_seq(self):
""" Fetch sequence by blocks """
fa = Fasta('data/chr17.hg19.part.fa')
gene = Fasta("data/gene.bed12.fasta")
expect = gene[list(gene.keys())[0]][:].seq
bed = "data/gene.bed12"
with open(bed) as fi:
record = fi.readline().strip().split("\t")
chrom = record[0]
start = int(record[1])
strand = record[5]
# parse bed12 format
starts = [int(x) for x in record[11].split(",")[:-1]]
sizes = [int(x) for x in record[10].split(",")[:-1]]
starts = [start + x for x in starts]
def test_buffer_value(self):
Fasta(self.fasta, read_ahead = 0.5)
def test_reverse_iter(self):
expect = list(chain(*([line[::-1] for line in record][::-1] for record in Fasta('data/genes.fasta', as_raw=True))))
result = list(chain(*([line for line in reversed(record)] for record in Fasta('data/genes.fasta', as_raw=True))))
for a, b in zip(expect, result):
print(a, b)
assert expect == result
self.json_conf["pick"]["fragments"]["remove"] = self.json_conf["pick"]["run_options"].pop(key)
new_home = "fragments/remove"
else:
self.json_conf["pick"]["clustering"][key] = self.json_conf["pick"]["run_options"].pop(key)
new_home = "clustering/{}".format(key)
warns = PendingDeprecationWarning(
"""The \"{}\" property has now been moved to pick/{}. \
Please update your configuration files in the future.""".format(
key, new_home))
self.logger.warning(warns)
if self.json_conf.get("pick", {}).get("alternative_splicing", {}).get("pad", False) is True:
# Check that, when asks for padding, the reference genome is present
self.logger.debug("Checking for the presence of the reference genome")
try:
_ = pyfaidx.Fasta(self.json_conf["reference"]["genome"])
except (pyfaidx.FastaIndexingError, FileNotFoundError, pyfaidx.FastaNotFoundError):
self.logger.error("Transcript padding cannot be executed without a valid genome file.\
Please, either disable the padding or provide a valid genome sequence.")
sys.exit(1)
self.logger.debug("Valid reference genome found")
else:
pass
self.context = multiprocessing.get_context()
if self.json_conf["pick"]["scoring_file"].endswith((".pickle", ".model")):
with open(self.json_conf["pick"]["scoring_file"], "rb") as forest:
self.regressor = pickle.load(forest)
if not isinstance(self.regressor["scoring"], (RandomForestRegressor, RandomForestClassifier)):
exc = TypeError("Invalid regressor provided, type: %s", type(self.regressor["scoring"]))
self.logger.critical(exc)
return
else: return set(string.split(','))
parser=argparse.ArgumentParser(description='A simple script that retrieves the FASTA sequences from a file given a list of ids.')
parser.add_argument("-v", "--reverse", action="store_true", default=False, help="Retrieve entries which are not in the list, as in grep -v (a homage).")
parser.add_argument('list', type=check_type, help='File with the list of the ids to recover, one by line. Alternatively, names separated by commas.')
parser.add_argument('fasta', type=argparse.FileType('r'), help='FASTA file.')
parser.add_argument('out', type=argparse.FileType('w'), help='Optional output file.', nargs='?', default=sys.stdout)
args=parser.parse_args()
if isinstance(args.list, IOBase):
ids = set([line.rstrip() for line in args.list.readlines()])
else:
ids=args.list
args.fasta.close()
fasta = pyfaidx.Fasta(args.fasta.name)
for name in ids:
assert name in fasta
print(">{0}".format(name), file=args.out)
print(*textwrap.wrap(str(fasta[name]), width=60),
sep="\n", file=args.out)
def pyfaidx_fasta(n):
print('timings for pyfaidx.Fasta')
ti = []
tf = []
for _ in range(n):
t = time.time()
f = pyfaidx.Fasta(fa_file.name)
ti.append(time.time() - t)
t = time.time()
read_dict(f, headers)
tf.append(time.time() - t)
os.remove(index)
# profile memory usage and report timings
tracemalloc.start()
f = pyfaidx.Fasta(fa_file.name)
read_dict(f, headers)
os.remove(index)
print(tracemalloc.get_traced_memory())
print(mean(ti))
print(mean(tf)/nreads/10*1000*1000)
tracemalloc.stop()
def parse_fasta(fa_file):
try:
fa_object = pyfaidx.Fasta(fa_file)
except pyfaidx.UnsupportedCompressionFormat:
# pyfaidx can handle bgzip but not gzip; so we just hack it here and
# unzip the file for checksumming, then rezip it for the rest of the
# asset build.
# TODO: streamline this to avoid repeated compress/decompress
# in refgenie we feed this function with uncompressed, newly built
# FASTA file, so compression issues are not relevant
os.system("gunzip {}".format(fa_file))
fa_file_unzipped = fa_file.replace(".gz", "")
fa_object = pyfaidx.Fasta(fa_file_unzipped)
os.system("gzip {}".format(fa_file_unzipped))
return fa_object
hmm_dir (str): Path to directory where all HMMs are stored as .pickles
result_file (str): Path to the result file.
max_time (str): Max run time in seconds
Raises:
Exception: If the pickle ``sequences_file`` cannot be loaded
Exception: if the hmm_dir does not exist
'''
start = datetime.datetime.now()
max_time, time_interval, next_time = int(
max_time), int(time_interval), int(next_time)
try:
l_sequence = Fasta(sequences_file)
except:
raise Exception(
"Cannot load putative pickle file sequences_file: {}".format(sequences_file))
if not os.path.isdir(hmm_dir):
try:
os.makedirs(hmm_dir)
except:
raise Exception(
"hmm_dir does not exists and could not be created: {}".format(hmm_dir))
try:
with open(hmm_annotation_file, 'rb') as fh:
dHMM_annotation = pickle.load(fh)
except:
raise Exception(
def make_fastg(node_mode) :
global node_length
if node_mode == "read" :
dict_ = follows
idx = Fasta('formatted_preads4falcon.fasta')
else :
dict_ = ctg_follows
idx = Fasta('p_ctg.fa')
for source, sinks in dict_.items() :
if node_mode == "read" :
all_source_nodes.append(source)
node_length = str(read_len_dict[source])
else :
all_ctg_source_nodes.append(source)
node_length = str(contigs[source][3])
global read_density
read_density = read_density_in_ctg(source)
header = []
header.append(">"+headerify(source)+":")
for element in sinks :
if node_mode == "read" :
node_length = str(read_len_dict[element])
elif node_mode == "contig" :
node_length = str(contigs[element][3])