Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
1. Count newlines before start
2. Count newlines to end
3. Difference of 1 and 2 is number of newlines in [start:end]
4. Seek to start position, taking newlines into account
5. Read to end position, return sequence
"""
assert start == int(start)
assert end == int(end)
try:
i = self.index[rname]
except KeyError:
raise FetchError("Requested rname {0} does not exist! "
"Please check your FASTA file.".format(rname))
start0 = start - 1 # make coordinates [0,1)
if start0 < 0:
raise FetchError(
"Requested start coordinate must be greater than 1.")
seq_len = end - start0
# Calculate offset (https://github.com/samtools/htslib/blob/20238f354894775ed22156cdd077bc0d544fa933/faidx.c#L398)
newlines_before = int(
(start0 - 1) / i.lenc * (i.lenb - i.lenc)) if start0 > 0 and i.lenc else 0
newlines_to_end = int(end / i.lenc * (i.lenb - i.lenc)) if i.lenc else 0
newlines_inside = newlines_to_end - newlines_before
seq_blen = newlines_inside + seq_len
bstart = i.offset + newlines_before + start0
if seq_blen < 0 and self.strict_bounds:
raise FetchError("Requested coordinates start={0:n} end={1:n} are "
"invalid.\n".format(start, end))
elif end > i.rlen and self.strict_bounds:
raise FetchError("Requested end coordinate {0:n} outside of {1}. "
"\n".format(end, rname))
raise FetchError(
"Requested start coordinate must be greater than 1.")
seq_len = end - start0
# Calculate offset (https://github.com/samtools/htslib/blob/20238f354894775ed22156cdd077bc0d544fa933/faidx.c#L398)
newlines_before = int(
(start0 - 1) / i.lenc * (i.lenb - i.lenc)) if start0 > 0 and i.lenc else 0
newlines_to_end = int(end / i.lenc * (i.lenb - i.lenc)) if i.lenc else 0
newlines_inside = newlines_to_end - newlines_before
seq_blen = newlines_inside + seq_len
bstart = i.offset + newlines_before + start0
if seq_blen < 0 and self.strict_bounds:
raise FetchError("Requested coordinates start={0:n} end={1:n} are "
"invalid.\n".format(start, end))
elif end > i.rlen and self.strict_bounds:
raise FetchError("Requested end coordinate {0:n} outside of {1}. "
"\n".format(end, rname))
with self.lock:
if self._bgzf: # We can't add to virtual offsets, so we need to read from the beginning of the record and trim the beginning if needed
self.file.seek(i.offset)
chunk = start0 + newlines_before + newlines_inside + seq_len
chunk_seq = self.file.read(chunk).decode()
seq = chunk_seq[start0 + newlines_before:]
else:
self.file.seek(bstart)
# If the requested sequence exceeds len(FastaRecord), return as much as possible
if bstart + seq_blen > i.bend and not self.strict_bounds:
seq_blen = i.bend - bstart
# Otherwise it should be safe to read the sequence
if seq_blen > 0:
def fill_buffer(self, name, start, end):
try:
seq = self.from_file(name, start, end)
self.buffer['seq'] = seq
self.buffer['start'] = start
self.buffer['end'] = end
self.buffer['name'] = name
except FetchError:
pass
"Please check your FASTA file.".format(rname))
start0 = start - 1 # make coordinates [0,1)
if start0 < 0:
raise FetchError(
"Requested start coordinate must be greater than 1.")
seq_len = end - start0
# Calculate offset (https://github.com/samtools/htslib/blob/20238f354894775ed22156cdd077bc0d544fa933/faidx.c#L398)
newlines_before = int(
(start0 - 1) / i.lenc * (i.lenb - i.lenc)) if start0 > 0 and i.lenc else 0
newlines_to_end = int(end / i.lenc * (i.lenb - i.lenc)) if i.lenc else 0
newlines_inside = newlines_to_end - newlines_before
seq_blen = newlines_inside + seq_len
bstart = i.offset + newlines_before + start0
if seq_blen < 0 and self.strict_bounds:
raise FetchError("Requested coordinates start={0:n} end={1:n} are "
"invalid.\n".format(start, end))
elif end > i.rlen and self.strict_bounds:
raise FetchError("Requested end coordinate {0:n} outside of {1}. "
"\n".format(end, rname))
with self.lock:
if self._bgzf: # We can't add to virtual offsets, so we need to read from the beginning of the record and trim the beginning if needed
self.file.seek(i.offset)
chunk = start0 + newlines_before + newlines_inside + seq_len
chunk_seq = self.file.read(chunk).decode()
seq = chunk_seq[start0 + newlines_before:]
else:
self.file.seek(bstart)
# If the requested sequence exceeds len(FastaRecord), return as much as possible
if bstart + seq_blen > i.bend and not self.strict_bounds:
key_fn_test = self.key_function(
"TestingReturnType of_key_function")
if not isinstance(key_fn_test, string_types):
raise KeyFunctionError(
"key_function argument should return a string, not {0}".
format(type(key_fn_test)))
except Exception as e:
pass
self.filt_function = filt_function
assert duplicate_action in ("stop", "first", "last", "longest",
"shortest", "drop")
self.duplicate_action = duplicate_action
self.as_raw = as_raw
self.default_seq = default_seq
if self._bgzf and self.default_seq is not None:
raise FetchError(
"The default_seq argument is not supported with using BGZF compression. Please decompress your FASTA file and try again."
)
if self._bgzf:
self.strict_bounds = True
else:
self.strict_bounds = strict_bounds
self.split_char = split_char
self.one_based_attributes = one_based_attributes
self.sequence_always_upper = sequence_always_upper
self.index = OrderedDict()
self.lock = Lock()
self.buffer = dict((('seq', None), ('name', None), ('start', None),
('end', None)))
if not read_ahead or isinstance(read_ahead, integer_types):
self.read_ahead = read_ahead
elif not isinstance(read_ahead, integer_types):