Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
converter.convert()
samFile = pysam.AlignmentFile(fileHandle.name, "r")
try:
# TODO suppressed because of pysam output:
# [W::sam_parse1] mapped mate cannot have zero coordinate;
# treated as unmapped
# and
# [W::sam_parse1] mapped mate cannot have zero coordinate;
# treated as unmapped
# see discussion in
# https://github.com/ga4gh/ga4gh-server/pull/789
with utils.suppressOutput():
convertedReads = list(samFile.fetch())
finally:
samFile.close()
samFile = pysam.AlignmentFile(readGroupSet.getDataUrl(), "rb")
try:
sourceReads = []
referenceName = reference.getName().encode()
readGroupName = readGroup.getLocalId().encode()
for readAlignment in samFile.fetch(referenceName):
tags = dict(readAlignment.tags)
if 'RG' in tags and tags['RG'] == readGroupName:
sourceReads.append(readAlignment)
finally:
samFile.close()
self.verifySamRecordsEqual(sourceReads, convertedReads)
######################
start_time = time.time()
# ===== Initialize input and output bam files =====
args.infile = str(args.infile)
args.outfile = str(args.outfile)
sscs_bam = pysam.AlignmentFile(args.infile, "rb")
dcs_bam = pysam.AlignmentFile(args.outfile, "wb", template=sscs_bam)
if re.search('dcs.sc', args.outfile) is None:
sscs_singleton_bam = pysam.AlignmentFile('{}.sscs.sc.singleton.bam'.format(args.outfile.split('.dcs.sc')[0]),
"wb", template=sscs_bam)
dcs_header = "DCS - Singleton Correction"
sc_header = " SC"
else:
sscs_singleton_bam = pysam.AlignmentFile('{}.sscs.singleton.bam'.format(args.outfile.split('.dcs')[0]),
"wb", template=sscs_bam)
dcs_header = "DCS"
sc_header = ""
stats = open('{}.stats.txt'.format(args.outfile.split('.dcs')[0]), 'a')
time_tracker = open('{}.time_tracker.txt'.format(args.outfile.split('.dcs')[0]), 'a')
# ===== Initialize dictionaries and counters=====
read_dict = collections.OrderedDict()
tag_dict = collections.defaultdict(int)
pair_dict = collections.defaultdict(list)
csn_pair_dict = collections.defaultdict(list)
unmapped = 0
unmapped_mate = 0
multiple_mapping = 0 # Secondary/supplementary reads
def find_circles(self):
begin = time.time()
os.chdir("%s" % self.dir)
bam = ps.AlignmentFile("%s" % self.bam,'rb')
print("Iterating trough the bam file")
output = []
for read in bam:
try:
if read.has_tag('XA'):
tag = read.get_tag('XA').split(';')[:-1]
read_edit_distance = read.get_tag('NM')
if read_edit_distance <= self.mismatch and len(tag) ==1:
def test_get_blocked_alignment():
bam = pysam.AlignmentFile('/home/jgarthur/sv/analysis/alignments/bwa_mem/short-reads/jun_jul.mdup.merge.mdup.bam', 'rb')
blocks = [GenomeInterval('1', 0, 100),
GenomeInterval('1', 110, 210),
GenomeInterval('1', 210, 2000)]
aln = pysam.AlignedSegment()
aln.pos = 0
aln.cigarstring = '50M'
aln.seq = 'A' * 50
aln.is_reverse = False
print(get_blocked_alignment(aln, blocks, 0, bam))
assert(get_blocked_alignment(aln, blocks, 0, bam) == ([1], 0))
assert(get_blocked_alignment(aln, blocks, 0, bam, is_rf=True) == ([0], 50))
aln.is_reverse = True
print(get_blocked_alignment(aln, blocks, 0, bam))
assert(get_blocked_alignment(aln, blocks, 0, bam) == ([0], 50))
assert(get_blocked_alignment(aln, blocks, 0, bam, is_rf=True) == ([1], 0))
:param ref_rle_fq: result of `process_ref_seq`
:param start: starting position within reference
:param end: ending position within reference
:returns: `Sample` object
"""
if self.is_compressed:
aln_to_pairs = partial(yield_compressed_pairs, ref_rle=ref_rle_fq)
elif self.max_hp_len == 1:
aln_to_pairs = get_pairs
else:
aln_to_pairs = partial(get_pairs_with_hp_len, ref_seq=ref_rle_fq)
# accumulate data in dicts
aln_counters = defaultdict(Counter)
ref_bases = dict()
with pysam.AlignmentFile(reads_bam, 'rb') as bamfile:
n_aln = bamfile.count(region.ref_name, region.start, region.end)
#print('{} reads aligned to ref segment.'.format(n_aln))
aln_reads = bamfile.fetch(region.ref_name, region.start, region.end)
if read_fraction is not None:
low, high = read_fraction
np.random.seed((int(now()) * region.start) % 2**32)
fraction = ((high - low) * np.random.random_sample(1) + low)[0]
aln_reads = [a for a in aln_reads]
n_reads = len(aln_reads)
n_reads_to_keep = max(int(fraction * n_reads), 1)
replace = n_reads_to_keep > n_reads
msg = "Resampling (replace {}) from {} to {} ({:.3f}) for {}"
logging.debug(msg.format(replace, n_reads, n_reads_to_keep, fraction, region))
aln_reads = np.random.choice(aln_reads, n_reads_to_keep, replace=replace)
start = region.start
def _set_quality(in_bam):
"""
change all quality to 255
"""
bam = pysam.AlignmentFile(in_bam, "rb")
out_file = utils.append_stem(in_bam, "_normqual")
if file_exists(out_file):
return out_file
with file_transaction(out_file) as tx_out:
with pysam.AlignmentFile(tx_out, "wb", template=bam) as out_handle:
for read in bam.fetch():
read.mapping_quality = 255
out_handle.write(read)
return out_file
def preview_bam(filename, kernel=None, style=None):
import pysam
res = ''
with pysam.AlignmentFile(filename, 'rb') as bam:
headers = bam.header
for record_type in ('RG', 'PG', 'SQ'):
if record_type not in headers:
continue
else:
records = headers[record_type]
res += record_type + ':\n'
for i, record in enumerate(records):
if isinstance(record, str):
res += ' ' + short_repr(record) + '\n'
elif isinstance(record, dict):
res += ' '
for idx, (k, v) in enumerate(record.items()):
if idx < 4:
res += '{}: {} '.format(k, short_repr(v))
elif idx == 4:
else:
with open(in_fn, 'rb') as f:
f_start = f.read(2)
if f_start == b'C\t' or f_start == b'U\t':
in_format = 'kraken'
elif f_start == b'#O':
in_format = 'histo'
if in_format == 'sam':
in_f = pysam.AlignmentFile(in_fn, "r")
elif in_format == 'bam':
in_f = pysam.AlignmentFile(in_fn, "rb")
elif in_format == 'cram':
in_f = pysam.AlignmentFile(in_fn, "rc")
elif in_format == 'uncompressed_bam':
in_f = pysam.AlignmentFile(in_fn, "ru")
elif in_format == 'kraken':
in_f = open(in_fn, 'r')
# no need to load assignments if input is a histogram -> go to load_histo
elif in_format == 'histo':
in_f = None
# let pysam assess the format
else:
in_f = pysam.AlignmentFile(in_fn)
return in_f, in_format
def validateSNPs(options, fastafile):
'''read SNPs in pileup format.'''
callers = []
for filename in options.filename_references:
for f in glob.glob(filename):
callers.append(pysam.SNPCaller(pysam.AlignmentFile(f, "rb"), fastafile))
if len(callers) == 0:
E.warning("no transcript data available")
else:
E.info("validating against %i reference files" % (len(callers)))
c = E.Counter()
outf = options.stdout
nfiles = len(callers)
outf.write("\t".join(
("contig", "pos", "reference", "genotype", "consensus_quality", "genotype_quality", "mapping_quality", "coverage")))
outf.write(
"\tstatus\tcalls\tfiltered\tmin_coverage\tmax_coverage\tmin_quality\tmax_quality\t")
def read_bam(fp, chrom=None, start=None, end=None):
with closing(pysam.AlignmentFile(fp), 'rb') as f:
bam_iter = f.fetch(chrom, start, end)
records = [(s.qname, s.flag, s.rname, s.pos, s.mapq,
s.cigarstring if s.mapq != 0 else np.nan,
s.rnext, s.pnext, s.tlen, s.seq, s.qual,
json.dumps(OrderedDict(s.tags))) for s in bam_iter]
df = pandas.DataFrame(records, columns=BAM_FIELDS)
return df