Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_commit(data, marker=b'Default', blob=None):
if not blob:
blob = Blob.from_string(b'The blob content ' + marker)
tree = Tree()
tree.add(b"thefile_" + marker, 0o100644, blob.id)
cmt = Commit()
if data:
assert isinstance(data[-1], Commit)
cmt.parents = [data[-1].id]
cmt.tree = tree.id
author = b"John Doe " + marker + b" "
cmt.author = cmt.committer = author
tz = parse_timezone(b'-0200')[0]
cmt.commit_time = cmt.author_time = int(time())
cmt.commit_timezone = cmt.author_timezone = tz
cmt.encoding = b"UTF-8"
cmt.message = b"The commit message " + marker
tag = Tag()
tag.tagger = b"john@doe.net"
tag.message = b"Annotated tag"
def create_repo(cls, repo_name, author=None):
"""Create a new repository named repo_name."""
if cls.storage.repo_exists(repo_name):
raise NameError()
repoman = cls(author)
repoman._repo = cls.storage.init_bare(repo_name)
tree = Tree()
commit = repoman._create_commit()
commit.tree = tree.id
commit.message = 'Initialization commit'
repoman.advance_branch(commit, tree, 'master')
return repoman
def create_repo(cls, repo_name, author=None):
"""Create a new repository named repo_name."""
if cls.storage.repo_exists(repo_name):
raise NameError()
repoman = cls(author)
repoman._repo = cls.storage.init_bare(repo_name)
tree = Tree()
commit = repoman._create_commit()
commit.tree = tree.id
commit.message = 'Initialization commit'
repoman.advance_branch(commit, tree, 'master')
return repoman
def build_tree(path):
tree = Tree()
for basename, entry in trees[path].items():
if isinstance(entry, dict):
mode = stat.S_IFDIR
sha = build_tree(pathjoin(path, basename))
else:
(mode, sha) = entry
tree.add(basename, mode, sha)
object_store.add_object(tree)
return tree.id
return build_tree(b'')
def __init__(self, git_repo, git_id=None, parent=None, name=None):
self.parent = parent
self.name = name
self.git = git_repo
if git_id is None:
log.debug('tree %r: creating blank git tree', self.name)
git_tree = dulwich.objects.Tree()
self.git.object_store.add_object(git_tree)
git_id = git_tree.id
log.debug('tree %r: loading git tree %r', self.name, git_id)
self._git_tree = self.git.tree(git_id)
self._ctx_count = 0
self._loaded = dict()
self._dirty = dict()
def get_differing_files(repo, past, current):
past_files = {}
current_files = {}
if past is not None:
past_files = dict([(name, sha) for mode, name, sha in past.entries()])
if current is not None:
current_files = dict([(name, sha) for mode, name, sha in current.entries()])
added = set(current_files) - set(past_files)
removed = set(past_files) - set(current_files)
changed = [o for o in past_files if o in current_files and past_files[o] != current_files[o]]
for name in added:
sha = current_files[name]
yield name
if isinstance(repo.get_object(sha), objects.Tree):
for item in get_differing_files(repo, None, repo.get_object(sha)):
yield os.path.join(name, item)
for name in removed:
sha = past_files[name]
yield name
if isinstance(repo.get_object(sha), objects.Tree):
for item in get_differing_files(repo, repo.get_object(sha), None):
yield os.path.join(name, item)
for name in changed:
past_sha = past_files[name]
current_sha = current_files[name]
if isinstance(repo.get_object(past_sha), objects.Tree):
for item in get_differing_files(repo, repo.get_object(past_sha), repo.get_object(current_sha)):
yield os.path.join(name, item)
def update_sub(self, name, value):
assert ((name.endswith('.ls') and value[0] == 0100644) or
(name.endswith('.sub') and value[0] == 040000))
log.info('Updating record %s in %s, value=%s',
name, repr(self.path), value)
if self.sub_id is None:
sub = dulwich.objects.Tree()
else:
sub = self.storage.git.tree(self.sub_id)
if value[1] is None:
del sub[quote(name)]
else:
sub[quote(name)] = value
self.sub_id = sub.id
self.storage.git.object_store.add_object(sub)
self.parent.update_sub(self.name + '.sub',
(040000, self.sub_id))
author_timezone = serializable_property(
"author_timezone", "Returns the zone the author time is in.")
encoding = serializable_property(
"encoding", "Encoding of the commit message.")
mergetag = serializable_property(
"mergetag", "Associated signed tag.")
gpgsig = serializable_property(
"gpgsig", "GPG Signature.")
OBJECT_CLASSES = (
Commit,
Tree,
Blob,
Tag,
)
_TYPE_MAP = {}
for cls in OBJECT_CLASSES:
_TYPE_MAP[cls.type_name] = cls
_TYPE_MAP[cls.type_num] = cls
# Hold on to the pure-python implementations for testing
_parse_tree_py = parse_tree
_sorted_tree_items_py = sorted_tree_items
try:
# Try to import C versions
def _get_current_tree(self):
try:
ref_object = self.repo[self.ref]
except KeyError:
return Tree()
if isinstance(ref_object, Tree):
return ref_object
else:
return self.repo.object_store[ref_object.tree]
def get_objects_in_tree(self, treeId):
objects = []
tree = self.repo.get_object(treeId)
objects.append(tree)
entries = tree.entries()
for entryId in entries:
# get the entry's sha
objectId = entryId[2]
object = self.repo.get_object(objectId)
if isinstance(object, Tree):
objects.extend(self.get_objects_in_tree(objectId))
else:
objects.append(object)
return objects