Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __post_init__(self):
path_string = str(self.repo_path.resolve())
try:
self.dulwich_repo = self.git.init(path_string)
except FileExistsError:
logger.info(f"Using existing repository at the following path: {self.repo_path}")
self.dulwich_repo = Repo(path_string)
def get_recent_tags(projdir=PROJDIR):
"""Get list of tags in order from newest to oldest and their datetimes.
Args:
projdir: path to ``.git``
Returns:
list of tags sorted by commit time from newest to oldest
Each tag in the list contains the tag name, commit time, commit id, author
and any tag meta. If a tag isn't annotated, then its tag meta is ``None``.
Otherwise the tag meta is a tuple containing the tag time, tag id and tag
name. Time is in UTC.
"""
with Repo(projdir) as project: # dulwich repository object
refs = project.get_refs() # dictionary of refs and their SHA-1 values
tags = {} # empty dictionary to hold tags, commits and datetimes
# iterate over refs in repository
for key, value in refs.items():
key = key.decode('utf-8') # compatible with Python-3
obj = project.get_object(value) # dulwich object from SHA-1
# don't just check if object is "tag" b/c it could be a "commit"
# instead check if "tags" is in the ref-name
if u'tags' not in key:
# skip ref if not a tag
continue
# strip the leading text from refs to get "tag name"
_, tag = key.rsplit(u'/', 1)
# check if tag object is "commit" or "tag" pointing to a "commit"
try:
commit = obj.object # a tuple (commit class, commit id)
from website.addons.osffiles.utils import urlsafe_filename
file_name_key = urlsafe_filename(path)
repo_path = os.path.join(settings.UPLOADS_PATH, self._primary_key)
# TODO make sure it all works, otherwise rollback as needed
# Do a git delete, which also removes from working filesystem.
try:
subprocess.check_output(
['git', 'rm', path],
cwd=repo_path,
shell=False
)
repo = Repo(repo_path)
message = '{path} deleted'.format(path=path)
committer = self._get_committer(auth)
repo.do_commit(message, committer)
except subprocess.CalledProcessError as error:
if error.returncode == 128:
raise FileNotFoundError('File {0!r} was not found'.format(path))
raise
if file_name_key in self.files_current:
nf = NodeFile.load(self.files_current[file_name_key])
nf.is_deleted = True
nf.save()
self.files_current.pop(file_name_key, None)
def read_submodule_head(path):
"""Read the head commit of a submodule.
Args:
path: path to the submodule
Returns: HEAD sha, None if not a valid head/repository
"""
from dulwich.errors import NotGitRepository
from dulwich.repo import Repo
# Repo currently expects a "str", so decode if necessary.
# TODO(jelmer): Perhaps move this into Repo() ?
if not isinstance(path, str):
path = path.decode(sys.getfilesystemencoding())
try:
repo = Repo(path)
except NotGitRepository:
return None
try:
return repo.head()
except KeyError:
return None
def __init__(self, repodir, storage_root_subdir=os.curdir):
self.repo = Repo(repodir)
self.storage_root_subdir = storage_root_subdir
parser.add_argument('--unmerged', action='store_const', const=True,
help='show only unmerged pull-requests')
args = parser.parse_args()
def print_verbose(str, *argv, **kwargs):
if(verbose):
print("... {}".format(str),\
file=sys.stderr, *argv, **kwargs)
u=config['main']['user']
s=config['main']['token']
verbose=args.verbose
unmerged=args.unmerged
sys.stderr.flush()
repo = Repo.discover()
print_verbose("Found Git repository {}".format(repo))
print_verbose(" control dir is {}".format(repo.controldir()))
try:
repo.commondir
except AttributeError:
print_verbose("dulwich does not know repo.commondir()")
else:
print_verbose(" common dir is {}".format(repo.commondir()))
print_verbose(" index path is {}".format(repo.index_path()))
print_verbose("Getting commits in current branch", end='')
print_verbose(" {}".\
format(str(repo.refs.read_ref(b'HEAD'), "ascii")), end='')
sys.stderr.flush()
walk = repo.get_walker(include=repo.head(),\
exclude=[repo.get_refs()[b'refs/remotes/cgal/master']])
# doesn't match hg's bytes-only view of filesystems, we just
# have to cope with that. As a workaround, try decoding our
# (bytes) path to the repo in hg's active encoding and hope
# for the best.
gitpath = self.gitdir.decode(encoding.encoding, encoding.encodingmode)
# make the git data directory
if os.path.exists(self.gitdir):
return Repo(gitpath)
else:
# Set disallowinitbare to prevent hggit from creating a .hg/git
# directory. This is useful when the .hg/git directory should be
# managed externally.
if self.ui.configbool("hggit", "disallowinitbare"):
raise error.Abort(_("missing .hg/git repo"))
os.mkdir(self.gitdir)
return Repo.init_bare(gitpath)
if not repo_path.startswith(REPO_PATH + repo_hostname):
self._logError(
f"Invalid path: {self.repo_url} + {self.repo_name} => {repo_path}"
)
raise Exception("Can't clone repo. Invalid repository.")
if not os.path.isdir(repo_path):
# Using GitPython here since Dulwich was throwing weird errors like:
# "IOError: Not a gzipped file" when fetching resources like:
# https://git.centos.org/r/rpms/dhcp.git
if not GitPythonRepo.clone_from(
self.repo_url, repo_path, bare=True):
self._logError(f"Can't clone repo {self.repo_name}.")
raise Exception("Can't clone repo.")
self.repo = Repo(repo_path)
return True
def open_repo_closing(path_or_repo):
"""Open an argument that can be a repository or a path for a repository.
returns a context manager that will close the repo on exit if the argument
is a path, else does nothing if the argument is a repo.
"""
if isinstance(path_or_repo, BaseRepo):
return _noop_context_manager(path_or_repo)
return closing(Repo(path_or_repo))