Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, inFile, ploidy=1, parser=pysam.asVCF()):
# using old-style class superclass calling here
# since TabixReader is derived from pysam.TabixFile
# which is itself an old-style class (due to Cython version?)
TabixReader.__init__(self, inFile, parser=parser)
# when pysam uses new-style classes, we can replace with:
#super(VcfReader, self).__init__(inFile, parser=parser)
assert ploidy in (1, 2)
self.ploidy = ploidy
self.clens = []
self.sample_names = None
for line in self.header: # pylint: disable=E1101
line = bytes_to_string(line)
if line.startswith('##contig='):
line = line[13:-1]
c = line.split(',')[0]
clen = int(line.split('=')[1])
def __init__(self, inFile, ploidy=1, parser=pysam.asVCF()):
super(VcfReader, self).__init__(inFile, parser=parser)
assert ploidy in (1,2)
self.ploidy = ploidy
self.clens = []
self.sample_names = None
for line in self.header:
line = bytes_to_string(line)
if line.startswith('##contig='):
line = line[13:-1]
c = line.split(',')[0]
clen = int(line.split('=')[1])
self.clens.append((c,clen))
elif line.startswith('#CHROM'):
row = line.split('\t')
self.sample_names = row[9:]
self.clens = dict(self.clens)
def __init__(self, inFile, ploidy=1, parser=pysam.asVCF()):
super(VcfReader, self).__init__(inFile, parser=parser)
assert ploidy in (1,2)
self.ploidy = ploidy
self.clens = []
self.sample_names = None
for line in self.header:
if line.startswith('##contig='):
line = line[13:-1]
c = line.split(',')[0]
clen = int(line.split('=')[1])
self.clens.append((c,clen))
elif line.startswith('#CHROM'):
row = line.split('\t')
self.sample_names = row[9:]
self.clens = dict(self.clens)
assert self.sample_names
def select_vcf_records(bed_records, vcf_records):
"""Returns an iterable of VCF records, corresponding to the contents of each
region specified by the BED records. Records are returned at most once, even
if covered by multiple BED records."""
contigs = frozenset(vcf_records.contigs)
vcf_parser = pysam.asVCF()
# Timer class used processing progress; meant primarily for BAM files
progress = timer.BAMTimer(None)
# Cache of positions observed for this contig, to prevent returning
# positions in overlapping regions multiple times
contig_cache = None
contig_cache_name = None
for bed in sorted(bed_records):
if bed.contig not in contigs:
# Skip contigs for which no calls have been made (e.g. due to
# low coverage. Otherwise Pysam raises an exception.
continue
elif contig_cache_name != bed.contig:
# Reset cache per contig, to save memory
def read_tabix_VCF(lines):
return read_tabix(lines, pysam.asVCF())
def process_piece(filename_vcf, chrom, chrom_length, sample_index, chain_info, diploid, passed, quality, vcf_keep, vcf_discard_file):
ret = {'chrom': chrom, 'stats': {}, 'chain_info': chain_info}
stats = OrderedDict()
stats['ACCEPTED'] = 0
if vcf_keep:
vcf_discard = open(vcf_discard_file, "w")
line_no = 0
try:
LOG.info("Processing Chromosome {0}...".format(chrom))
tb = pysam.TabixFile(filename_vcf)
for vcf_rec in tb.fetch(chrom, parser=pysam.asVCF()):
line_no += 1
try:
gt = parse_gt_new(vcf_rec, sample_index)
except:
LOG.info("Unable to parse record, improper VCF file?")
continue
LOG.debug('\n')
LOG.debug(vcf_rec)
LOG.debug(gt)
LOG.debug(vcf_rec[sample_index])
if passed and 'PASS' not in vcf_rec.FILTER:
LOG.debug("TOSSED: FILTERED ON PASS")
def _read_files(args):
in_header = True
has_filters = False
vcf_parser = pysam.asVCF()
for filename in args.filenames:
with open_ro(filename, "rb") as handle:
for line in handle:
if not line.startswith(b"#"):
in_header = False
line = line.rstrip(b"\n\r")
vcf = vcf_parser(line, len(line))
if args.reset_filter:
vcf.filter = "."
yield vcf
elif in_header:
if not (line.startswith(b"##") or has_filters):
has_filters = True
for item in sorted(vcffilter.describe_filters(args).items()):
print('##FILTER=' % item)
def filter_vcfs(genotype, contig, start, end):
if contig in genotype.contigs:
parser = pysam.asVCF()
# This raises a ValueError if the VCF does not
# contain any entries for the specified contig.
for vcf in genotype.fetch(contig, start, end, parser=parser):
if vcf.filter in ("PASS", "."):
yield vcf