Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_commit(data, marker=b'Default', blob=None):
if not blob:
blob = Blob.from_string(b'The blob content ' + marker)
tree = Tree()
tree.add(b"thefile_" + marker, 0o100644, blob.id)
cmt = Commit()
if data:
assert isinstance(data[-1], Commit)
cmt.parents = [data[-1].id]
cmt.tree = tree.id
author = b"John Doe " + marker + b" "
cmt.author = cmt.committer = author
tz = parse_timezone(b'-0200')[0]
cmt.commit_time = cmt.author_time = int(time())
cmt.commit_timezone = cmt.author_timezone = tz
cmt.encoding = b"UTF-8"
cmt.message = b"The commit message " + marker
tag = Tag()
tag.tagger = b"john@doe.net"
def test_add_thin_pack_ext_ref(self):
sos = swift.SwiftObjectStore(self.fsc)
odata = self._put_pack(sos, 1, 'Default1')
ref_blob_content = odata[0].as_raw_string()
ref_blob_id = odata[0].id
new_blob = Blob.from_string(ref_blob_content.replace('blob',
'yummy blob'))
blob, tree, tag, cmt = \
create_commit([], marker='Default2', blob=new_blob)
data = [(REF_DELTA, (ref_blob_id, blob.as_raw_string())),
(tree.type_num, tree.as_raw_string()),
(cmt.type_num, cmt.as_raw_string()),
(tag.type_num, tag.as_raw_string())]
f = BytesIO()
build_pack(f, data, store=sos)
sos.add_thin_pack(f.read, None)
self.assertEqual(len(self.fsc.store), 6)
if cherrypy.session.login==None: raise ValueError, "FileViewer.edit: no username supplied"
elif username==None or username=="anonymous":
anon = True #whether or not the user is anonymous
if SESSION_KEY in cherrypy.session.keys():
username = cherrypy.session[SESSION_KEY].username
anon = False
else: username = "anonymous"
#at least until we get access control lists working
if anon:
if branch=="master": #don't let anonymous users modify branch "master"
branch = "anonymous"
branch = "anonymous"
#make the blob
blob = Blob.from_string(content)
repo = Repo(self.package.path())
#change to the right branch
last_head = repo.head()
set_head(repo, "master")
last_commit = repo.get_object(repo.head())
tree = repo.tree(repo.get_object(repo.head()).tree)
#set the file
tree[self.filename] = (0100644, blob.id)
#make the commit
commit = Commit()
commit.tree = tree.id
commit.parents = [last_head]
commit.author = commit.committer = username
def _set_data(self, value):
log.debug('blob %r: updating value', self.name)
self._git_id = None
self._git_blob = dulwich.objects.Blob.from_string(value)
self.parent._set_dirty(self.name, self)
# 'new' path. Note that the file will be finally stored
# under the name given by the last rename.
if other_changes.type == CHANGE_RENAME:
other_json = load_json(other_changes.new.path, other)
path = other_changes.new.path
if my_changes.type == CHANGE_RENAME:
my_json = load_json(my_changes.new.path, mine)
path = my_changes.new.path
if take_mine:
merged_json = my_json or other_json or base_json
else:
merged_json, merge_conflict = merge_jsons(*jsons)
if merge_conflict:
conflicts[path] = merged_json
had_conflict = had_conflict or merge_conflict
merged_blob = Blob.from_string(
dumps(merged_json, sort_keys=True, indent=4))
self._update_store(merged_blob)
merge_tree.add(path, FILE_MODE, merged_blob.id)
else:
try:
data = (load_json(path, mine) or load_json(path, other) or
load_json(path, base))
except ValueError: # Loading a non json file
blob = Blob.from_string(load_raw(path, mine))
else:
blob = Blob.from_string(dumps(data, sort_keys=True,
indent=4))
self._update_store(blob)
merge_tree.add(path, FILE_MODE, blob.id)
self._update_store(merge_tree)
return merge_tree, conflicts
def tree_entry(fctx, blob_cache):
"""Compute a dulwich TreeEntry from a filectx.
A side effect is the TreeEntry is stored in the passed cache.
Returns a 2-tuple of (dulwich.objects.TreeEntry, dulwich.objects.Blob).
"""
blob_id = blob_cache.get(fctx.filenode(), None)
blob = None
if blob_id is None:
blob = dulobjs.Blob.from_string(fctx.data())
blob_id = blob.id
blob_cache[fctx.filenode()] = blob_id
flags = fctx.flags()
if "l" in flags:
mode = 0o120000
elif "x" in flags:
mode = 0o100755
else:
mode = 0o100644
return (dulobjs.TreeEntry(os.path.basename(fctx.path()), mode, blob_id), blob)
from klaus.utils import guess_is_binary
if commit.parents:
parent_tree = self[commit.parents[0]].tree
else:
parent_tree = None
summary = {'nfiles': 0, 'nadditions': 0, 'ndeletions': 0}
file_changes = [] # the changes in detail
dulwich_changes = self.object_store.tree_changes(parent_tree, commit.tree)
for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in dulwich_changes:
summary['nfiles'] += 1
try:
oldblob = self.object_store[oldsha] if oldsha else Blob.from_string(b'')
newblob = self.object_store[newsha] if newsha else Blob.from_string(b'')
except KeyError:
# newsha/oldsha are probably related to submodules.
# Dulwich will handle that.
pass
# Check for binary files -- can't show diffs for these
if guess_is_binary(newblob) or \
guess_is_binary(oldblob):
file_changes.append({
'is_binary': True,
'old_filename': oldpath or '/dev/null',
'new_filename': newpath or '/dev/null',
'chunks': None
})
continue
def content(mode, hexsha):
if hexsha is None:
return Blob.from_string(b'')
elif S_ISGITLINK(mode):
return Blob.from_string(b"Subproject commit " + hexsha + b"\n")
else:
return store[hexsha]