Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# read items individually to prevent hitting possible sys-limits
count = 0
for ostream in ostream_reader:
assert isinstance(ostream, OStream)
count += 1
# END for each ostream
assert ostream_reader.task().error() is None
assert count == ni
# get info about our items
reader = IteratorReader( istream.binsha for istream in istreams )
info_reader = db.info_async(reader)
count = 0
for oinfo in info_reader:
assert isinstance(oinfo, OInfo)
count += 1
# END for each oinfo instance
assert count == ni
# combined read-write using a converter
# add 2500 items, and obtain their output streams
nni = 2500
reader = IteratorReader(istream_generator(offset=ni, ni=nni))
istream_to_sha = lambda istreams: [ istream.binsha for istream in istreams ]
istream_reader = db.store_async(reader)
istream_reader.set_post_cb(istream_to_sha)
ostream_reader = db.stream_async(istream_reader)
def test_reading(self):
gdb = GitDB(fixture_path('../../.git/objects'))
# we have packs and loose objects, alternates doesn't necessarily exist
assert 1 < len(gdb.databases()) < 4
# access should be possible
gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
assert isinstance(gdb.info(gitdb_sha), OInfo)
assert isinstance(gdb.stream(gitdb_sha), OStream)
assert gdb.size() > 200
sha_list = list(gdb.sha_iter())
assert len(sha_list) == gdb.size()
# This is actually a test for compound functionality, but it doesn't
# have a separate test module
# test partial shas
# this one as uneven and quite short
assert gdb.partial_to_complete_sha_hex('155b6') == hex_to_bin("155b62a9af0aa7677078331e111d0f7aa6eb4afc")
# mix even/uneven hexshas
for i, binsha in enumerate(sha_list):
assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
# END for each sha
Either the 20 byte sha which points to some object in the database,
or the negative offset from the pack_offset, so that pack_offset - delta_info yields
the pack offset of the base object"""
__slots__ = tuple()
def __new__(cls, packoffset, type, size, delta_info):
return tuple.__new__(cls, (packoffset, type, size, delta_info))
#{ Interface
@property
def delta_info(self):
return self[3]
#} END interface
class OStream(OInfo):
"""Base for object streams retrieved from the database, providing additional
information about the stream.
Generally, ODB streams are read-only as objects are immutable"""
__slots__ = tuple()
def __new__(cls, sha, type, size, stream, *args, **kwargs):
"""Helps with the initialization of subclasses"""
return tuple.__new__(cls, (sha, type, size, stream))
def __init__(self, *args, **kwargs):
tuple.__init__(self)
#{ Stream Reader Interface
def read(self, size=-1):
else:
if type_id not in delta_types:
return OInfo(sha, type_id_to_type_map[type_id], uncomp_size)
# END handle non-deltas
# deltas are a little tougher - unpack the first bytes to obtain
# the actual target size, as opposed to the size of the delta data
streams = self.collect_streams_at_offset(offset)
buf = streams[0].read(512)
offset, src_size = msb_size(buf)
offset, target_size = msb_size(buf, offset)
# collect the streams to obtain the actual object type
if streams[-1].type_id in delta_types:
raise BadObject(sha, "Could not resolve delta object")
return OInfo(sha, streams[-1].type, target_size)
# END handle stream
if as_stream:
if type_id not in delta_types:
packstream = self._pack.stream(offset)
return OStream(sha, packstream.type, packstream.size, packstream.stream)
# END handle non-deltas
# produce a delta stream containing all info
# To prevent it from applying the deltas when querying the size,
# we extract it from the delta stream ourselves
streams = self.collect_streams_at_offset(offset)
dstream = DeltaApplyReader.new(streams)
return ODeltaStream(sha, dstream.type, None, dstream)
else:
if type_id not in delta_types:
return OInfo(sha, type_id_to_type_map[type_id], uncomp_size)
# END handle non-deltas
# deltas are a little tougher - unpack the first bytes to obtain
# the actual target size, as opposed to the size of the delta data
streams = self.collect_streams_at_offset(offset)
buf = streams[0].read(512)
offset, src_size = msb_size(buf)
offset, target_size = msb_size(buf, offset)
# collect the streams to obtain the actual object type
if streams[-1].type_id in delta_types:
raise BadObject(sha, "Could not resolve delta object")
return OInfo(sha, streams[-1].type, target_size)
# END handle stream
else:
if type_id not in delta_types:
return OInfo(sha, type_id_to_type_map[type_id], uncomp_size)
# END handle non-deltas
# deltas are a little tougher - unpack the first bytes to obtain
# the actual target size, as opposed to the size of the delta data
streams = self.collect_streams_at_offset(offset)
buf = streams[0].read(512)
offset, src_size = msb_size(buf)
offset, target_size = msb_size(buf, offset)
# collect the streams to obtain the actual object type
if streams[-1].type_id in delta_types:
raise BadObject(sha, "Could not resolve delta object")
return OInfo(sha, streams[-1].type, target_size)
# END handle stream
def info(self, sha):
hexsha, typename, size = self._git.get_object_header(bin_to_hex(sha))
return OInfo(hex_to_bin(hexsha), typename, size)
def info(self, sha):
m = self._map_loose_object(sha)
try:
typ, size = loose_object_header_info(m)
return OInfo(sha, typ, size)
finally:
if hasattr(m, 'close'):
m.close()
# END assure release of system resources