Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_realign_align_params(genome_source):
read = pysam.AlignedSegment()
read.query_sequence = genome_source.get_seq("chr1", 101, 114, "+")
# the default seed length should be too short for this to align
alns = genome_source.align(Alignment(read))
assert len(alns) == 0
genome_source.bwa.SetMinSeedLength(13)
# but now it should align
alns = genome_source.align(Alignment(read))
assert len(alns) == 1
assert alns[0].cigarstring == "14M"
assert alns[0].reference_start == 101
template_bam = pysam.Samfile(bamfiles[0])
header = tk_bam.get_bam_header_as_dict(template_bam)
for bam_fn in bamfiles[1:]:
bam = pysam.Samfile(bam_fn)
header['SQ'].extend(bam.header['SQ'])
bam_out = pysam.Samfile(out_bamfile, "wb", header=header)
for bam_fn in bamfiles:
bam = pysam.Samfile(bam_fn)
for rec in bam:
# Convert to string and back to replace the old tid with the new one
# This appears to be the only way to do this with pysam (sometime after 0.9)
rec = pysam.AlignedSegment.fromstring(rec.to_string(),
header=pysam.AlignmentHeader.from_dict(header))
# I don't know why we do this.
if rec.is_unmapped and not rec.mate_is_unmapped:
rec.reference_id = rec.next_reference_id
elif not rec.is_unmapped and rec.mate_is_unmapped:
rec.next_reference_id = rec.reference_id
elif rec.is_unmapped and rec.mate_is_unmapped:
rec.reference_id = -1
rec.next_reference_id = -1
# Pull fields out of qname, and make tags, writing out to bam_out
qname_split = rec.qname.split(cr_fastq.AugmentedFastqHeader.TAG_SEP)
rec.qname = qname_split[0]
tags = rec.tags
return qname, flag, rname, pos, \
mapping_quality, cigarstring, \
rnext, pnext, template_length, seq, qual, _tg
# So close, pysam.tostring, so close
def _tags(_t):
_tl = _t.split(':')
if _tl[1] == 'i':
_tl[2] = int(_tl[2])
elif _tl[1] == 'f':
_tl[2] = float(_tl[2])
return _tl[0], _tl[2], _tl[1]
r = pysam.AlignedSegment()
r.qname, r.flag, r.rname, r.pos, \
r.mapping_quality, r.cigarstring, \
r.rnext, r.pnext, r.template_length, r.seq, r.qual, tags = _split(s)
r.set_tags([_tags(t) for t in tags])
return r
# Sanity check
if end - start < 1:
if b.is_reverse:
start = end - 1
else:
end = start + 1
if start < 0:
start = 0
if end > chromDict[b.reference_name]:
end = chromDict[b.reference_name]
if end - start < 1:
return None
# create a new read
b2 = pysam.AlignedSegment()
b2.query_name = b.query_name
b2.flag = b.flag
b2.reference_id = b.reference_id
b2.reference_start = start
b2.mapping_quality = b.mapping_quality
b2.cigar = ((0, end - start),) # Returned cigar is only matches
if tLen < 0:
b2.template_length = tLen - deltaTLen
else:
b2.template_length = tLen + deltaTLen
b2.next_reference_id = b.next_reference_id
b2.next_reference_start = b.next_reference_start
if b.is_proper_pair:
if b2.is_read2 and b2.is_reverse:
b2.next_reference_start += args.shift[0]
elif not b2.is_read2 and b2.is_reverse:
GenomeInterval('1', 110, 210),
GenomeInterval('1', 210, 2000)]
aln = pysam.AlignedSegment()
aln.pos = 0
aln.cigarstring = '50M'
aln.seq = 'A' * 50
aln.is_reverse = False
print(get_blocked_alignment(aln, blocks, 0, bam))
assert(get_blocked_alignment(aln, blocks, 0, bam) == ([1], 0))
assert(get_blocked_alignment(aln, blocks, 0, bam, is_rf=True) == ([0], 50))
aln.is_reverse = True
print(get_blocked_alignment(aln, blocks, 0, bam))
assert(get_blocked_alignment(aln, blocks, 0, bam) == ([0], 50))
assert(get_blocked_alignment(aln, blocks, 0, bam, is_rf=True) == ([1], 0))
aln = pysam.AlignedSegment()
aln.rname = 0
aln.pos = 90
aln.seq = 'A' * 40
aln.cigarstring = '20M20S'
aln.set_tag('SA', '1,191,-,20M20S,60,0;', 'Z')
print(get_blocked_alignment(aln, blocks, 0, bam))
assert(get_blocked_alignment(aln, blocks, 0, bam) == ([1, 2], -90))
assert(get_blocked_alignment(aln, blocks, 0, bam, is_rf=True) == ([3, 0], -80))
os.path.isfile( os.path.join(out_dir,'multi.sorted.bam')) ):
#for read in tqdm(in_bam):
counter = 0
for read in in_bam:
# poor man's progress bar
counter += 1
if not counter % 10**6:
logger.debug('tagged %i alignments'%counter)
read_tag = read_tagger(read)
## skip reads with unassigned tagger
if read_tag==-1:
continue
read.tags += [('RT', read_tag)] ## add the tag
tagged_read = pysam.AlignedSegment()
tagged_read.query_name = read.query_name
tagged_read.query_sequence = 'N'
tagged_read.flag = read.flag
tagged_read.reference_id = read.reference_id
tagged_read.reference_start = read_tag - 1 # 0-based leftmost coordinate
tagged_read.mapping_quality = read.mapping_quality
tagged_read.cigar = ((0,1),)
tagged_read.template_length = read.template_length
tagged_read.query_qualities = pysam.qualitystring_to_array("<")
tagged_read.tags = read.tags
read_len = sum([i[1] for i in read.cigar if i[0]==0])
tagged_read.tags += [('RL', read_len)]
# add lib_type check
if lib_type != "unstranded":
tagged_read.is_reverse = (read.is_reverse) ^ (lib_type!="sense")
def write_perfect_reads(qname, rg_id, long_qname_table, ref_dict, read_data, cigar_v2,
fp):
"""Given reads begining to a template, write out the perfect alignments to file
:param qname:
:param rg_id:
:param long_qname_table:
:param ref_dict: dict containing reference names mapped to ref_id
:param read_data: [x1, x2, ... ] where xi is (seq, qual)
and, e.g. [x1, x2] constitute a pair, if the input is paired end
:param cigar_v2: output CIGARs in V2 format
:param fp: pysam file pointer to write out
:return:
"""
reads = [
pysam.AlignedSegment()
for _ in range(len(read_data))
]
for n, (ri, rd, read) in enumerate(zip(parse_qname(qname, long_qname_table), read_data, reads)):
read.qname = qname
read.reference_id = ref_dict[ri.chrom]
read.pos = ri.pos - 1
read.cigarstring = ri.cigar if cigar_v2 else cigarv2_v1(ri.cigar)
read.mapq = 60
read.set_tag('RG', rg_id, value_type='Z')
# TODO: ask aligner people what the graph cigar tag is
# TODO: Set this as unmapped?
if ri.strand:
read.is_reverse = 1
read.mate_is_reverse = 0
if False, find first common reference matching base
:return: -max_d <= d_err <= max_d + 2
d_err = max_d + 1 if wrong chrom
d_err = max_d + 2 if unmapped
"""
if r.is_unmapped:
d_err = max_d + 2
elif r.reference_name != ri.chrom:
d_err = max_d + 1
else:
# This is what we score against when we are 'strict' or inside an insertion
# Or we can't find a common reference matching base in the correct and aligned reads
correct_pos, aligned_pos = ri.pos - 1, r.pos
# If we are not strict AND not in an insertion we use first common reference matching base
if not strict and ri.special_cigar is None and not is_simple_case(ri.cigar, r.cigarstring):
rc = pysam.AlignedSegment()
rc.pos, rc.cigarstring = ri.pos - 1, cigarv2_v1(ri.cigar)
_correct_pos, _aligned_pos = find_first_common_reference_matching_base_positions(rc, r)
if _correct_pos is not None:
correct_pos, aligned_pos = _correct_pos, _aligned_pos
d_err = max(min((aligned_pos - correct_pos), max_d), -max_d)
return d_err