Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
blob = Blob.from_string('The blob content %s' % marker)
tree = Tree()
tree.add("thefile_%s" % marker, 0o100644, blob.id)
cmt = Commit()
if data:
assert isinstance(data[-1], Commit)
cmt.parents = [data[-1].id]
cmt.tree = tree.id
author = "John Doe %s " % marker
cmt.author = cmt.committer = author
tz = parse_timezone('-0200')[0]
cmt.commit_time = cmt.author_time = int(time())
cmt.commit_timezone = cmt.author_timezone = tz
cmt.encoding = "UTF-8"
cmt.message = "The commit message %s" % marker
tag = Tag()
tag.tagger = "john@doe.net"
tag.message = "Annotated tag"
tag.tag_timezone = parse_timezone('-0200')[0]
tag.tag_time = cmt.author_time
tag.object = (Commit, cmt.id)
tag.name = "v_%s_0.1" % marker
return blob, tree, tag, cmt
blob = Blob.from_string(b'The blob content ' + marker)
tree = Tree()
tree.add(b"thefile_" + marker, 0o100644, blob.id)
cmt = Commit()
if data:
assert isinstance(data[-1], Commit)
cmt.parents = [data[-1].id]
cmt.tree = tree.id
author = b"John Doe " + marker + b" "
cmt.author = cmt.committer = author
tz = parse_timezone(b'-0200')[0]
cmt.commit_time = cmt.author_time = int(time())
cmt.commit_timezone = cmt.author_timezone = tz
cmt.encoding = b"UTF-8"
cmt.message = b"The commit message " + marker
tag = Tag()
tag.tagger = b"john@doe.net"
tag.message = b"Annotated tag"
tag.tag_timezone = parse_timezone(b'-0200')[0]
tag.tag_time = cmt.author_time
tag.object = (Commit, cmt.id)
tag.name = b"v_" + marker + b"_0.1"
return blob, tree, tag, cmt
def __init__(self, repository, revision):
self._stat_modes = {}
self.repository = repository
try:
commit = self.repository._repo[revision]
if isinstance(commit, objects.Tag):
revision = commit.object[1]
commit = self.repository._repo.get_object(commit.object[1])
except KeyError:
raise RepositoryError("Cannot get object with id %s" % revision)
self.raw_id = revision
self.id = self.raw_id
self.short_id = self.raw_id[:12]
self._commit = commit
self._tree_id = commit.tree
self._committer_property = 'committer'
self._author_property = 'author'
self._date_property = 'commit_time'
self._date_tz_property = 'commit_timezone'
self.revision = repository.revisions.index(revision)
self.nodes = {}
def _get_object(self, sha, cls):
assert len(sha) in (20, 40)
ret = self.get_object(sha)
if not isinstance(ret, cls):
if cls is Commit:
raise NotCommitError(ret)
elif cls is Blob:
raise NotBlobError(ret)
elif cls is Tree:
raise NotTreeError(ret)
elif cls is Tag:
raise NotTagError(ret)
else:
raise Exception("Type invalid: %r != %r" % (
ret.type_name, cls.type_name))
return ret
def next(self):
while True:
if not self.objects_to_send:
return None
(sha, name, leaf) = self.objects_to_send.pop()
if sha not in self.sha_done:
break
if not leaf:
info = self.object_store.pack_info_get(sha)
if info[0] == Commit.type_num:
self.add_todo([(info[2], "", False)])
elif info[0] == Tree.type_num:
self.add_todo([tuple(i) for i in info[1]])
elif info[0] == Tag.type_num:
self.add_todo([(info[1], None, False)])
if sha in self._tagged:
self.add_todo([(self._tagged[sha], None, True)])
self.sha_done.add(sha)
self.progress("counting objects: %d\r" % len(self.sha_done))
return (sha, name)
def _get_parsed_refs(self):
# cache the property
_repo = self._repo
refs = _repo.get_refs()
keys = [('refs/heads/', 'H'),
('refs/remotes/origin/', 'RH'),
('refs/tags/', 'T')]
_refs = {}
for ref, sha in refs.iteritems():
for k, type_ in keys:
if ref.startswith(k):
_key = ref[len(k):]
if type_ == 'T':
obj = _repo.get_object(sha)
if isinstance(obj, Tag):
sha = _repo.get_object(sha).object[1]
_refs[_key] = [sha, type_]
break
return _refs
def tag_handler(self, cmd):
"""Process a TagCommand."""
tag = Tag()
tag.tagger = cmd.tagger
tag.message = cmd.message
tag.name = cmd.tag
self.repo.add_object(tag)
self.repo.refs["refs/tags/" + tag.name] = tag.id
# If the reference points explicitly to the origin remote,
# but that remote isn't present (as it won't be when zuul
# configures the repo in CI), then we want the shortened
# form of the reference. We put this option last in the
# list because we want the more explicit name to be used
# when someone is running reno locally with a more
# standard git configuration.
if name.startswith('origin/'):
candidates.append('refs/heads/' + name.partition('/')[-1])
for ref in candidates:
LOG.debug('looking for ref {!r} as {!r}'.format(name, ref))
key = ref.encode('utf-8')
if key in self._repo.refs:
sha = self._repo.refs[key]
o = self._repo[sha]
if isinstance(o, objects.Tag):
# Branches point directly to commits, but
# signed tags point to the signature and we
# need to dereference it to get to the commit.
sha = o.object[1]
LOG.info('found ref {!r} as {!r} at {}'.format(
name, ref, sha))
return sha
# If we end up here we didn't find any of the candidates.
raise ValueError('Unknown reference {!r}'.format(name))
return self._repo.refs[b'HEAD']
gpgsig = None
for field, value in _parse_message(chunks):
# TODO(jelmer): Enforce ordering
if field == _TREE_HEADER:
tree = value
elif field == _PARENT_HEADER:
parents.append(value)
elif field == _AUTHOR_HEADER:
author_info = parse_time_entry(value)
elif field == _COMMITTER_HEADER:
commit_info = parse_time_entry(value)
elif field == _ENCODING_HEADER:
encoding = value
elif field == _MERGETAG_HEADER:
mergetag.append(Tag.from_string(value + b'\n'))
elif field == _GPGSIG_HEADER:
gpgsig = value
elif field is None:
message = value
else:
extra.append((field, value))
return (tree, parents, author_info, commit_info, encoding, mergetag,
gpgsig, message, extra)