Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def step_impl_given(context):
context.test_git_repo_dir = tempfile.mkdtemp("paasta_tools_deployments_json_itest")
context.test_git_repo = Repo.init(context.test_git_repo_dir)
paasta_print("Temp repo in %s" % context.test_git_repo_dir)
blob = Blob.from_string(b"My file content\n")
tree = Tree()
tree.add(b"spam", 0o0100644, blob.id)
commit = Commit()
commit.author = commit.committer = b"itest author"
commit.commit_time = commit.author_time = int(time())
commit.commit_timezone = commit.author_timezone = parse_timezone(b"-0200")[0]
commit.message = b"Initial commit"
commit.tree = tree.id
object_store = context.test_git_repo.object_store
object_store.add_object(blob)
object_store.add_object(tree)
object_store.add_object(commit)
context.test_git_repo.refs[b"refs/heads/master"] = commit.id
context.expected_commit_as_bytes = commit.id
context.expected_commit = context.expected_commit_as_bytes.decode()
def create_commit(data, marker=b'Default', blob=None):
if not blob:
blob = Blob.from_string(b'The blob content ' + marker)
tree = Tree()
tree.add(b"thefile_" + marker, 0o100644, blob.id)
cmt = Commit()
if data:
assert isinstance(data[-1], Commit)
cmt.parents = [data[-1].id]
cmt.tree = tree.id
author = b"John Doe " + marker + b" "
cmt.author = cmt.committer = author
tz = parse_timezone(b'-0200')[0]
cmt.commit_time = cmt.author_time = int(time())
cmt.commit_timezone = cmt.author_timezone = tz
cmt.encoding = b"UTF-8"
cmt.message = b"The commit message " + marker
tag = Tag()
tag.tagger = b"john@doe.net"
tag.message = b"Annotated tag"
tag.tag_timezone = parse_timezone(b'-0200')[0]
tag.tag_time = cmt.author_time
tag.object = (Commit, cmt.id)
tag.name = b"v_" + marker + b"_0.1"
))
tree = repo.object_store[repo.object_store[parent_commit_id].tree]
# get setup.py
setuppy_mode, setuppy_id = tree['setup.py']
setuppy = repo.object_store[setuppy_id]
release_setup = Blob.from_string(replace_version(setuppy.data, 'version',
str(new_version)))
tree['setup.py'] = (setuppy_mode, release_setup.id)
objects_to_add.append(release_setup)
objects_to_add.append(tree)
now = int(time.time())
new_commit = Commit()
new_commit.parents = [parent_commit_id]
new_commit.tree = tree.id
new_commit.author = author
new_commit.committer = author
new_commit.commit_time = now
new_commit.author_time = now
now = int(time.time())
offset = tzlocal().utcoffset(datetime.utcfromtimestamp(now))
timezone = offset.days * 24 * 60 * 60 + offset.seconds
new_commit.commit_timezone = timezone
new_commit.author_timezone = timezone
def commit_handler(self, cmd):
"""Process a CommitCommand."""
commit = Commit()
if cmd.author is not None:
author = cmd.author
else:
author = cmd.committer
(author_name, author_email, author_timestamp, author_timezone) = author
(committer_name, committer_email, commit_timestamp,
commit_timezone) = cmd.committer
commit.author = author_name + b" <" + author_email + b">"
commit.author_timezone = author_timezone
commit.author_time = int(author_timestamp)
commit.committer = committer_name + b" <" + committer_email + b">"
commit.commit_timezone = commit_timezone
commit.commit_time = int(commit_timestamp)
commit.message = cmd.message
commit.parents = []
if cmd.from_:
#make the blob
blob = Blob.from_string(content)
repo = Repo(self.package.path())
#change to the right branch
last_head = repo.head()
set_head(repo, "master")
last_commit = repo.get_object(repo.head())
tree = repo.tree(repo.get_object(repo.head()).tree)
#set the file
tree[self.filename] = (0100644, blob.id)
#make the commit
commit = Commit()
commit.tree = tree.id
commit.parents = [last_head]
commit.author = commit.committer = username
commit.commit_time = commit.author_time = int(time.time())
commit.commit_timezone = commit.author_timezone = parse_timezone("-0600")
commit.encoding = "UTF-8"
commit.message = "not implemented yet"
repo.object_store.add_object(blob)
repo.object_store.add_object(tree)
repo.object_store.add_object(commit)
repo.refs["refs/heads/" + branch] = commit.id
repo.refs["HEAD"] = "ref: refs/heads/" + branch
new_link = "<a href="\"/package/"">go to the new version</a>"
self.new_link = new_link
def parse_patch_message(msg, encoding=None):
"""Extract a Commit object and patch from an e-mail message.
Args:
msg: An email message (email.message.Message)
encoding: Encoding to use to encode Git commits
Returns: Tuple with commit object, diff contents and git version
"""
c = Commit()
c.author = msg["from"].encode(encoding)
c.committer = msg["from"].encode(encoding)
try:
patch_tag_start = msg["subject"].index("[PATCH")
except ValueError:
subject = msg["subject"]
else:
close = msg["subject"].index("] ", patch_tag_start)
subject = msg["subject"][close+2:]
c.message = (subject.replace("\n", "") + "\n").encode(encoding)
first = True
body = msg.get_payload(decode=True)
lines = body.splitlines(True)
line_iter = iter(lines)
def export_hg_commit(self, rev, exporter):
self.ui.note(_("converting revision %s\n") % hex(rev))
oldenc = self.swap_out_encoding()
ctx = self.repo.changectx(rev)
extra = ctx.extra()
commit = Commit()
(time, timezone) = ctx.date()
# work around to bad timezone offets - dulwich does not handle
# sub minute based timezones. In the one known case, it was a
# manual edit that led to the unusual value. Based on that,
# there is no reason to round one way or the other, so do the
# simplest and round down.
timezone -= timezone % 60
commit.author = self.get_git_author(ctx)
commit.author_time = int(time)
commit.author_timezone = -timezone
if "committer" in extra:
try:
# fixup timezone
(name, timestamp, timezone) = extra["committer"].rsplit(" ", 2)
trees.append(obj)
tree = obj
except KeyError:
break
# Cut down the blob and all rotten trees on the way back...
for path, tree in reversed(zip(paths, trees)):
del tree[path]
if tree:
# This tree still has elements - don't remove it or any
# of it's parents
break
object_store.add_object(commit_tree)
# Create commit
commit = objects.Commit()
commit.tree = commit_tree.id
commit.parents = [p._commit.id for p in self.parents if p]
commit.author = commit.committer = safe_str(author)
commit.encoding = ENCODING
commit.message = safe_str(message)
# Compute date
if date is None:
date = time.time()
elif isinstance(date, datetime.datetime):
date = time.mktime(date.timetuple())
author_time = kwargs.pop('author_time', date)
commit.commit_time = int(date)
commit.author_time = int(author_time)
tz = time.timezone
gitsha = handler.map_git_get(hgctx.hex())
if not gitsha:
# TODO deal better with commits in the middle of octopus merges
raise hgutil.Abort(
_("no git commit found for rev %s") % hgctx,
hint=_("if this is an octopus merge, " "verify against the last rev"),
)
try:
gitcommit = handler.git.get_object(gitsha)
except KeyError:
raise hgutil.Abort(
_("git equivalent %s for rev %s not found!") % (gitsha, hgctx)
)
if not isinstance(gitcommit, Commit):
raise hgutil.Abort(
_("git equivalent %s for rev %s is not a commit!") % (gitsha, hgctx)
)
ui.status(_("verifying rev %s against git commit %s\n") % (hgctx, gitsha))
failed = False
# TODO check commit message and other metadata
dirkind = stat.S_IFDIR
hgfiles = set(hgctx)
gitfiles = set()
i = 0
with progress.bar(ui, _("verify"), total=len(hgfiles)) as prog:
"""Print the changes in a commit.
:param repo: Path to repository
:param objects: Objects to show (defaults to [HEAD])
:param outstream: Stream to write to
:param default_encoding: Default encoding to use if none is set in the
commit
"""
if objects is None:
objects = ["HEAD"]
if not isinstance(objects, list):
objects = [objects]
with open_repo_closing(repo) as r:
for objectish in objects:
o = parse_object(r, objectish)
if isinstance(o, Commit):
def decode(x):
return commit_decode(o, x, default_encoding)
else:
def decode(x):
return x.decode(default_encoding)
show_object(r, o, decode, outstream)