Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _readue(self, pos):
"""Return interpretation of next bits as unsigned exponential-Golomb code.
Raises ReadError if the end of the bitstring is encountered while
reading the code.
"""
oldpos = pos
try:
while not self[pos]:
pos += 1
except IndexError:
raise bitstring.ReadError("Read off end of bitstring trying to read code.")
leadingzeros = pos - oldpos
codenum = (1 << leadingzeros) - 1
if leadingzeros > 0:
if pos + leadingzeros + 1 > self.len:
raise bitstring.ReadError("Read off end of bitstring trying to read code.")
codenum += self._readuint(leadingzeros, pos + 1)
pos += leadingzeros + 1
else:
assert codenum == 0
pos += 1
return codenum, pos
if self.trace_queue.empty():
# make sure we can see the shutdown flag
continue
new_data = self.trace_queue.get()
m = trace_re.match(new_data)
if m:
self.trace_buffer.append("0x" + m.group(1))
else:
raise ValueError(
"Got a really weird trace packet " + new_data)
if not self.has_bits_to_read(self.trace_buffer,
DWT_PKTSIZE_BITS):
continue
try:
pkt = self.trace_buffer.peek(DWT_PKTSIZE_BITS).bytes
except ReadError:
logger.error("Fuck you length is " + repr(
len(self.trace_buffer)) + " " + repr(DWT_PKTSIZE_BITS))
if ord(pkt[0]) == 0x0E: # exception packets
pkt = pkt[1:]
self.dispatch_exception_packet(pkt)
# eat the bytes
self.trace_buffer.read(DWT_PKTSIZE_BITS)
# the first byte didn't match, rotate it out
else:
self.trace_buffer.read(8)
except:
logger.exception("Error processing trace")
self._closed.set()
logger.debug("Interrupt thread exiting...")
"""Return interpretation of next bits as unsigned interleaved exponential-Golomb code.
Raises ReadError if the end of the bitstring is encountered while
reading the code.
"""
try:
codenum = 1
while not self[pos]:
pos += 1
codenum <<= 1
codenum += self[pos]
pos += 1
pos += 1
except IndexError:
raise bitstring.ReadError("Read off end of bitstring trying to read code.")
codenum -= 1
return codenum, pos
def clean(self):
if self.pk:
return
if self.file:
# Ensure we're at the start of the file as `clean()` can sometimes
# be called multiple times (for some reason..)
self.file.seek(0)
file_url = self.file.url # To help the exception handler
try:
replay = Pyrope(self.file.read())
except bitstring.ReadError:
raise ValidationError("The file you selected does not seem to be a valid replay file.")
# Check if this replay has already been uploaded.
replays = Replay.objects.filter(
replay_id=replay.header['Id'],
)
if replays.count() > 0:
raise ValidationError(mark_safe("This replay has already been uploaded, <a href="{}">you can view it here</a>.".format(
replays[0].get_absolute_url()
)))
self.replay_id = replay.header['Id']
def _load(self, fp, callback):
# read preamble + decompress blocks + "all done"
total_steps = 3
current_step = 0
callback("Reading preamble", current_step, total_steps, True)
self.preamble = fp.read(self.START_OFFSET)
header_block_data = fp.read(blockutils.BLOCK_SIZE)
try:
self.header_block = bread.parse(
header_block_data, bread_spec.compressed_sav_file)
except bitstring.ReadError as e:
raise exceptions.ImportException(e)
if self.header_block.sram_init_check != b'jk':
error_msg = (
"SRAM init check bits incorrect (should be 'jk', was '%s')" %
(self.header_block.sram_init_check))
callback(error_msg, current_step, total_steps, False)
raise ValueError(error_msg)
self.active_project_number = self.header_block.active_file
current_step += 1
callback("Decompressing", current_step, total_steps, True)
def __init__(self, _bytes):
self.pos = 0
self.data = []
remainder = len(_bytes) % 4
if remainder:
_bytes = _bytes + '\0' * (4 - remainder)
bs = bitstring.ConstBitStream(bytes=_bytes)
while True:
try:
word = bs.read('uintle:32')
self.data.append(word)
except bitstring.ReadError:
break
_Strings.version = f.read('intle:32')
try:
while True:
table_len = f.read('intle:32')
tbl = {}
for _ in range(table_len):
str_id, str_len = f.readlist('intle:32, intle:32')
parsed_str = binascii.a2b_hex(f.read('hex:{}'.format(str_len * 8))[:-4])
tbl[str_id] = parsed_str.decode("utf-8").strip()
_Strings._tables.append(tbl)
except ReadError:
# EOF
pass
Raises ReadError if the end of the bitstring is encountered while
reading the code.
"""
oldpos = pos
try:
while not self[pos]:
pos += 1
except IndexError:
raise bitstring.ReadError("Read off end of bitstring trying to read code.")
leadingzeros = pos - oldpos
codenum = (1 << leadingzeros) - 1
if leadingzeros > 0:
if pos + leadingzeros + 1 > self.len:
raise bitstring.ReadError("Read off end of bitstring trying to read code.")
codenum += self._readuint(leadingzeros, pos + 1)
pos += leadingzeros + 1
else:
assert codenum == 0
pos += 1
return codenum, pos