Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _method_pysam(self, *args, **kwargs):
from pysam import FastxFile
if self.infile[1] is None:
_log.error("No quality file provided. Please add a quality file path ")
sys.exit(1)
else: # length must be equal and identifiers sorted similarly
with open(self.outfile, "w") as fastq_out:
for seq, qual in zip(FastxFile(self.infile[0]), FastxFile(self.infile[1])):
assert seq.name == qual.name
if seq.comment:
fastq_out.write("@{0} {1}\n{2}\n+\n{3}\n".format(seq.name,
seq.comment,
seq.sequence,
qual.sequence))
else:
fastq_out.write("@{0}\n{1}\n+\n{2}\n".format(seq.name,
seq.sequence,
qual.sequence))
p_list = [Process(target=worker,
args=(i, read_module, read_model, long_qname_table, in_queue, out_queue, seed_rng.randint(SEED_MAX)))
for i in range(processes)]
for p in p_list:
p.start()
logger.debug('Starting writer process')
wr = Process(target=writer, args=(fastq1_out, sidecar_out, fastq2_out, out_queue))
wr.start()
t0 = time.time()
# Burn through file
logger.debug('Starting to read FASTQ file')
fastq_l = [pysam.FastxFile(fastq1_in)]
if fastq2_in is not None: fastq_l += [pysam.FastxFile(fastq2_in)]
cnt = 0
for cnt, reads in enumerate(zip(*fastq_l)):
# [(qname, seq, seq) ... ]
in_queue.put((reads[0].name,) + tuple(r.sequence for r in reads))
if cnt % 100000 == 0:
logger.debug('Read {} templates'.format(cnt))
logger.debug('Stopping child processes')
for i in range(processes):
in_queue.put(__process_stop_code__)
for p in p_list:
p.join()
logger.debug('Stopping writer')
out_queue.put(__process_stop_code__)
"""From 2 FASTA files (reverse and forward) adapters, returns 2-columns file
This is useful for some tools related to adapter removal that takes as input
this kind of format
:param str filename1: FASTA format
:param stsr filename2: FASTA format (optional)
The files must have a one-to-one mapping
"""
f1 = pysam.FastxFile(file1)
if output_filename is not None:
fout = open(output_filename, "w")
if file2:
f2 = pysam.FastxFile(file2)
for read1, read2 in zip(f1, f2):
txt = "%s %s" % (read1.sequence, read2.sequence)
if output_filename is None:
print(txt)
else:
fout.write(txt+"\n")
else:
for read1 in f1:
txt = "%s" % read1.sequence
if output_filename is None:
print(read1.sequence)
else:
fout.write(txt+"\n")
if output_filename is not None:
fout.close()
def next(self): # python 2
# reads 4 lines
try:
d = next(self._fasta)
return d
except KeyboardInterrupt:
# This should allow developers to break a loop that takes too long
# through the reads to run forever
self._fasta.close()
self._fasta = FastxFile(self._fasta.filename)
except:
self._fasta.close()
self._fasta = FastxFile(self._fasta.filename)
raise StopIteration
return d
def buildReadDictionary(filename):
if not os.path.exists(filename):
raise OSError("file not found: %s" % filename)
fastqfile = pysam.FastxFile(filename)
fastq2sequence = {}
for x in fastqfile:
if x.name in fastq2sequence:
raise ValueError(
"read %s duplicate - can not unstrip" % x.name)
fastq2sequence[x.name] = (x.sequence, x.quality)
return fastq2sequence
def __init__(self, filename, verbose=False):
if filename.endswith(".gz"):
raise ValueError("Must be decompressed.")
self._fasta = FastxFile(filename)
self.filename = filename
logger.info("Reading input fasta file...please wait")
self._N = len([x for x in FastxFile(filename)])
if not _is_link_or_exists(fp):
raise IOError('File not present even after chown: {}'.format(os.path.abspath(fp)))
if os.path.islink(fp): # recursively follow links
logger.debug('File is symlink, following link. File: {}'.format(fp))
# support links to absolute and relative paths
target_path = os.readlink(fp)
if not os.path.isabs(target_path):
target_path = os.path.join(os.path.dirname(fp), target_path)
logger.debug('File is relative symlink: {}'.format(target_path))
# support links to absolute and relative paths
return check_file_exists(target_path)
else:
logger.debug('File exists! File: {} Size: {}'.format(fp, os.path.getsize(fp)))
if os.path.basename(fp) in {'basecalls.fasta', 'consensus.fasta'}:
with pysam.FastxFile(fp) as fx:
first_rec_name = next(fx).name
logger.debug('First fastx record: {}'.format(first_rec_name))
return fp
def get_fastq_read_ids(ref_path: str) -> Set[str]:
"""Extracts the read ids from a fastq file."""
read_ids = set()
with pysam.FastxFile(ref_path) as fastq:
for entry in fastq:
read_ids.add(entry.name.strip())
return read_ids
def iterate_over_fastx(fn, persist=True):
return len(list(pysam.FastxFile(fn, persist=persist)))