Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def content(mode, hexsha):
if hexsha is None:
return Blob.from_string(b'')
elif S_ISGITLINK(mode):
return Blob.from_string(b"Subproject commit " + hexsha + b"\n")
else:
return store[hexsha]
def pack_info_create(pack_data, pack_index):
pack = Pack.from_objects(pack_data, pack_index)
info = {}
for obj in pack.iterobjects():
# Commit
if obj.type_num == Commit.type_num:
info[obj.id] = (obj.type_num, obj.parents, obj.tree)
# Tree
elif obj.type_num == Tree.type_num:
shas = [(s, n, not stat.S_ISDIR(m)) for
n, m, s in obj.items() if not S_ISGITLINK(m)]
info[obj.id] = (obj.type_num, shas)
# Blob
elif obj.type_num == Blob.type_num:
info[obj.id] = None
# Tag
elif obj.type_num == Tag.type_num:
info[obj.id] = (obj.type_num, obj.object[1])
return zlib.compress(json_dumps(info))
def content(mode, hexsha):
if hexsha is None:
return Blob.from_string(b'')
elif S_ISGITLINK(mode):
return Blob.from_string(b"Subproject commit " + hexsha + b"\n")
else:
return store[hexsha]
def _find_content_rename_candidates(self):
candidates = self._candidates = []
# TODO: Optimizations:
# - Compare object sizes before counting blocks.
# - Skip if delete's S_IFMT differs from all adds.
# - Skip if adds or deletes is empty.
# Match C git's behavior of not attempting to find content renames if
# the matrix size exceeds the threshold.
if not self._should_find_content_renames():
return
block_cache = {}
check_paths = self._rename_threshold is not None
for delete in self._deletes:
if S_ISGITLINK(delete.old.mode):
continue # Git links don't exist in this repo.
old_sha = delete.old.sha
old_obj = self._store[old_sha]
block_cache[old_sha] = _count_blocks(old_obj)
for add in self._adds:
if stat.S_IFMT(delete.old.mode) != stat.S_IFMT(add.new.mode):
continue
new_obj = self._store[add.new.sha]
score = _similarity_score(old_obj, new_obj,
block_cache=block_cache)
if score > self._rename_threshold:
new_type = self._rename_type(check_paths, delete, add)
rename = TreeChange(new_type, delete.old, add.new)
candidates.append((-score, rename))
def get_nodes(self, path):
if self._get_kind(path) != NodeKind.DIR:
raise ChangesetError("Directory does not exist for revision %s at "
" '%s'" % (self.revision, path))
path = self._fix_path(path)
id = self._get_id_for_path(path)
tree = self.repository._repo[id]
dirnodes = []
filenodes = []
als = self.repository.alias
for name, stat, id in tree.iteritems():
if objects.S_ISGITLINK(stat):
dirnodes.append(SubModuleNode(name, url=None, changeset=id,
alias=als))
continue
obj = self.repository._repo.get_object(id)
if path != '':
obj_path = '/'.join((path, name))
else:
obj_path = name
if obj_path not in self._stat_modes:
self._stat_modes[obj_path] = stat
if isinstance(obj, objects.Tree):
dirnodes.append(DirNode(obj_path, changeset=self))
elif isinstance(obj, objects.Blob):
filenodes.append(FileNode(obj_path, changeset=self, mode=stat))
else:
def next(self):
while True:
if not self.objects_to_send:
return None
(sha, name, leaf) = self.objects_to_send.pop()
if sha not in self.sha_done:
break
if not leaf:
o = self.object_store[sha]
if isinstance(o, Commit):
self.add_todo([(o.tree, "", False)])
elif isinstance(o, Tree):
self.add_todo([(s, n, not stat.S_ISDIR(m))
for n, m, s in o.iteritems()
if not S_ISGITLINK(m)])
elif isinstance(o, Tag):
self.add_todo([(o.object[1], None, False)])
if sha in self._tagged:
self.add_todo([(self._tagged[sha], None, True)])
self.sha_done.add(sha)
self.progress(("counting objects: %d\r" %
len(self.sha_done)).encode('ascii'))
return (sha, name)