Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _read_fanout(self, byte_offset):
"""Generate a fanout table from our data"""
d = self._cursor.map()
out = list()
append = out.append
for i in xrange(256):
append(unpack_from('>L', d, byte_offset + i * 4)[0])
# END for each entry
return out
def _read_fanout(self, byte_offset):
"""Generate a fanout table from our data"""
d = self._cursor.map()
out = list()
append = out.append
for i in xrange(256):
append(unpack_from('>L', d, byte_offset + i * 4)[0])
# END for each entry
return out
def offsets(self):
""":return: sequence of all offsets in the order in which they were written
**Note:** return value can be random accessed, but may be immmutable"""
if self._version == 2:
# read stream to array, convert to tuple
a = array.array('I') # 4 byte unsigned int, long are 8 byte on 64 bit it appears
a.fromstring(buffer(self._cursor.map(), self._pack_offset, self._pack_64_offset - self._pack_offset))
# networkbyteorder to something array likes more
if sys.byteorder == 'little':
a.byteswap()
return a
else:
return tuple(self.offset(index) for index in xrange(self.size()))
# END handle version
:param pack_sha: binary sha over the whole pack that we index
:return: sha1 binary sha over all index file contents"""
# sort for sha1 hash
self._objs.sort(key=lambda o: o[0])
sha_writer = FlexibleSha1Writer(write)
sha_write = sha_writer.write
sha_write(PackIndexFile.index_v2_signature)
sha_write(pack(">L", PackIndexFile.index_version_default))
# fanout
tmplist = list((0,) * 256) # fanout or list with 64 bit offsets
for t in self._objs:
tmplist[byte_ord(t[0][0])] += 1
# END prepare fanout
for i in xrange(255):
v = tmplist[i]
sha_write(pack('>L', v))
tmplist[i + 1] += v
# END write each fanout entry
sha_write(pack('>L', tmplist[255]))
# sha1 ordered
# save calls, that is push them into c
sha_write(b''.join(t[0] for t in self._objs))
# crc32
for t in self._objs:
sha_write(pack('>L', t[1] & 0xffffffff))
# END for each crc
tmplist = list()
:return: self"""
slen = len(self)
if slen < 2:
return self
i = 0
first_data_index = None
while i < slen:
dc = self[i]
i += 1
if dc.data is None:
if first_data_index is not None and i - 2 - first_data_index > 1:
# if first_data_index is not None:
nd = StringIO() # new data
so = self[first_data_index].to # start offset in target buffer
for x in xrange(first_data_index, i - 1):
xdc = self[x]
nd.write(xdc.data[:xdc.ts])
# END collect data
del(self[first_data_index:i - 1])
buf = nd.getvalue()
self.insert(first_data_index, DeltaChunk(so, len(buf), 0, buf))
slen = len(self)
i = first_data_index + 1
# END concatenate data
first_data_index = None
continue
# END skip non-data chunks
def _iter_objects(self, as_stream):
"""Iterate over all objects in our index and yield their OInfo or OStream instences"""
_sha = self._index.sha
_object = self._object
for index in xrange(self._index.size()):
yield _object(_sha(index), as_stream, index)
# END for each index