Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_callback(self):
class MyCallbacks(pygit2.RemoteCallbacks):
@staticmethod
def credentials(url, username, allowed):
assert allowed & GIT_CREDTYPE_USERPASS_PLAINTEXT
raise Exception("I don't know the password")
url = "https://github.com/github/github"
remote = self.repo.create_remote("github", url)
with pytest.raises(Exception): remote.fetch(callbacks=MyCallbacks())
def test_clone_with_credentials(self):
url = 'https://github.com/libgit2/TestGitRepository'
credentials = pygit2.UserPass("libgit2", "libgit2")
callbacks = pygit2.RemoteCallbacks(credentials=credentials)
repo = clone_repository(url, self._temp_dir, callbacks=callbacks)
assert not repo.is_empty
import click
import toolz
from setuptools_scm.git import parse as parse_git_version
from ruamel.yaml import YAML
try:
import github3
except ImportError:
github3 = object
try:
import pygit2
except ImportError:
PygitRemoteCallbacks = object
else:
PygitRemoteCallbacks = pygit2.RemoteCallbacks
CWD = Path(__file__).parent.absolute()
NEW_FEATURE = 'New Features and Improvements'
BUGFIX = 'Bug Fixes'
def md(template, *args, **kwargs):
"""Wraps string.format with naive markdown escaping"""
def escape(s):
for char in ('*', '#', '_', '~', '`', '>'):
s = s.replace(char, '\\' + char)
return s
return template.format(*map(escape, args), **toolz.valmap(escape, kwargs))
# Try to use pygit2 0.22 cloning
try:
shaker.libs.logger.Logger().debug("open_repository: "
"Trying to open repository "
"using pygit2 0.22 format")
repo = pygit2.clone_repository(url,
target_directory,
credentials=credentials)
except TypeError as e:
shaker.libs.logger.Logger().debug("open_repository: "
"Failed to detect pygit2 0.22")
shaker.libs.logger.Logger().debug("open_repository: "
"Trying to open repository "
"using pygit2 0.23 format")
# Try to use pygit2 0.23 cloning
callbacks = pygit2.RemoteCallbacks(credentials)
repo = pygit2.clone_repository(url,
target_directory,
callbacks=callbacks)
shaker.libs.logger.Logger().debug(":open_repository: "
"Cloning url '%s' into local repository '%s'"
% (url, target_directory))
origin = filter(lambda x: x.name == 'origin', repo.remotes)
if not origin:
repo.create_remote('origin', url)
origin = filter(lambda x: x.name == 'origin', repo.remotes)
origin[0].credentials = credentials
return repo
def org_checkout(organization, github_url, github_token, clone_dir,
verbose, filter, exclude):
"""Checkout repositories from a GitHub organization."""
logging.basicConfig(
format="%(asctime)s: %(name)s:%(levelname)s %(message)s",
level=(verbose and logging.DEBUG or logging.INFO))
callbacks = pygit2.RemoteCallbacks(
pygit2.UserPass(github_token, 'x-oauth-basic'))
repos = []
for r in github_repos(organization, github_url, github_token):
if filter:
found = False
for f in filter:
if fnmatch(r['name'], f):
found = True
break
if not found:
continue
if exclude:
found = False
for e in exclude:
def repo(self):
if not os.path.exists(self.folder):
logger.info("Cloning {} into {}".format(self.origin, self.folder))
callbacks = pygit2.RemoteCallbacks(credentials=self.credentials)
def init_remote(repo, name, url):
remote = repo.remotes.create(name, url)
return remote
return pygit2.clone_repository(self.origin,
self.folder,
remote=init_remote,
# checkout_branch=self.branch,
callbacks=callbacks)
else:
return pygit2.Repository(self.folder)
def add_data(self, name, data):
"""Adds data with name"""
if self.get(name) is None:
msg = self._message(name, 'add')
self.write(name, data, message=msg)
return data
def replace_data(self, name, data):
"""Replace the entry name with data"""
if self.get(name) is None:
return False
msg = self._message(name, 'update')
self.write(name, data, message=msg)
class Callback(pygit2.RemoteCallbacks):
pass
def pull(self, remote='origin'):
"""
:param remote: the remote to pull from
:return: the old commit id
"""
repo = self.repo
remote_ = repo.remotes[remote]
try:
remote_.fetch(callbacks=pygit2.RemoteCallbacks(credentials=self.credentials))
except GitError as e:
logger.error(repr(e))
pass
current_commit = repo.head.target
# pull not the origin remote
# if self.target_commit is None or remote != 'origin':
# use most recent of remote
commit_target = repo.lookup_reference('refs/remotes/{}/{}'.format(remote, repo.head.shorthand)).target
# else:
# commit_target = self.target_commit
merge_result, _ = repo.merge_analysis(commit_target)
else:
repo_url = '%s/%s.git' % (os.environ['ext_git_url'], repo_name)
try:
clone_dir = clone_directory
if not os.path.isdir(clone_dir):
os.makedirs(clone_dir)
repo_path = os.path.join(clone_dir, uniq_path)
if internal==True:
username = os.environ['int_git_user']
password = os.environ['int_git_token']
login_info = git.UserPass(username, password)
#git_obj = git.clone_repository(repo_url, repo_path, credentials=login_info)
cb = git.RemoteCallbacks(credentials=login_info)
git_obj = git.clone_repository(repo_url, repo_path, callbacks=cb)
else:
#username = os.environ['ext_git_user']
#password = os.environ['ext_git_token']
git_obj = git.clone_repository(repo_url, repo_path)
return repo_path
except Exception, e:
if str(e).find('Unexpected HTTP status code: 404'):
log.logger.error("repo doesn't exists")
return "Repo doesn't exists"
log.logger.error(e)