Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update_repo(self, name):
def run(*args, **kwargs):
env = os.environ.copy()
env['GIT_TERMINAL_PROMPT'] = '0'
kwargs['env'] = env
return sp_run(*args, **kwargs)
try:
dd = self.path
if name not in self.repos:
raise UpdateError("Repo does not exist in data, wtf")
folder = os.path.join(dd, name)
# Make sure we don't git reset the Red folder on accident
if not os.path.exists(os.path.join(folder, '.git')):
#if os.path.exists(folder):
#shutil.rmtree(folder)
url = self.repos[name].get('url')
if not url:
raise UpdateError("Need to clone but no URL set")
branch = None
if "@" in url: # Specific branch
url, branch = url.rsplit("@", maxsplit=1)
if branch is None:
p = run(["git", "clone", url, folder])
else:
p = run(["git", "clone", "-b", branch, url, folder])
if p.returncode != 0:
p = run(rpcmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Unable to determine new commit hash")
newhash = p.stdout.decode().strip()
if oldhash == newhash:
return name, REPO_SAME, None
else:
self.populate_list(name)
self.save_repos()
ret = {}
cmd = ['git', '-C', folder, 'diff', '--no-commit-id',
'--name-status', oldhash, newhash]
p = run(cmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Error in git diff")
changed = p.stdout.strip().decode().split('\n')
for f in changed:
if not f.endswith('.py'):
continue
status, _, cogpath = f.partition('\t')
split = os.path.split(cogpath)
cogdir, cogname = split[-2:]
cogname = cogname[:-3] # strip .py
if len(split) != 2 or cogdir != cogname:
continue
if status not in ret:
env['GIT_TERMINAL_PROMPT'] = '0'
kwargs['env'] = env
return sp_run(*args, **kwargs)
try:
dd = self.path
if name not in self.repos:
raise UpdateError("Repo does not exist in data, wtf")
folder = os.path.join(dd, name)
# Make sure we don't git reset the Red folder on accident
if not os.path.exists(os.path.join(folder, '.git')):
#if os.path.exists(folder):
#shutil.rmtree(folder)
url = self.repos[name].get('url')
if not url:
raise UpdateError("Need to clone but no URL set")
branch = None
if "@" in url: # Specific branch
url, branch = url.rsplit("@", maxsplit=1)
if branch is None:
p = run(["git", "clone", url, folder])
else:
p = run(["git", "clone", "-b", branch, url, folder])
if p.returncode != 0:
raise CloningError()
self.populate_list(name)
return name, REPO_CLONE, None
else:
rpbcmd = ["git", "-C", folder, "rev-parse", "--abbrev-ref", "HEAD"]
p = run(rpbcmd, stdout=PIPE)
branch = p.stdout.decode().strip()
rpcmd = ["git", "-C", folder, "rev-parse", branch]
p = run(["git", "-C", folder, "reset", "--hard",
"origin/%s" % branch, "-q"])
if p.returncode != 0:
raise UpdateError("Error resetting to origin/%s" % branch)
p = run(rpcmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Unable to determine old commit hash")
oldhash = p.stdout.decode().strip()
p = run(["git", "-C", folder, "pull", "-q", "--ff-only"])
if p.returncode != 0:
raise UpdateError("Error pulling updates")
p = run(rpcmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Unable to determine new commit hash")
newhash = p.stdout.decode().strip()
if oldhash == newhash:
return name, REPO_SAME, None
else:
self.populate_list(name)
self.save_repos()
ret = {}
cmd = ['git', '-C', folder, 'diff', '--no-commit-id',
'--name-status', oldhash, newhash]
p = run(cmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Error in git diff")
changed = p.stdout.strip().decode().split('\n')
" and its community have no responsibility for any potential "
"damage that the content of 3rd party repositories might cause."
"\nBy typing 'I agree' you declare to have read and understand "
"the above message. This message won't be shown again until the"
" next reboot.")
class UpdateError(Exception):
pass
class CloningError(UpdateError):
pass
class RequirementFail(UpdateError):
pass
class Downloader:
"""Cog downloader/installer."""
def __init__(self, bot):
self.bot = bot
self.disclaimer_accepted = False
self.path = os.path.join("data", "downloader")
self.file_path = os.path.join(self.path, "repos.json")
# {name:{url,cog1:{installed},cog1:{installed}}}
self.repos = dataIO.load_json(self.file_path)
self.executor = ThreadPoolExecutor(NUM_THREADS)
self._do_first_run()
rpbcmd = ["git", "-C", folder, "rev-parse", "--abbrev-ref", "HEAD"]
p = run(rpbcmd, stdout=PIPE)
branch = p.stdout.decode().strip()
rpcmd = ["git", "-C", folder, "rev-parse", branch]
p = run(["git", "-C", folder, "reset", "--hard",
"origin/%s" % branch, "-q"])
if p.returncode != 0:
raise UpdateError("Error resetting to origin/%s" % branch)
p = run(rpcmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Unable to determine old commit hash")
oldhash = p.stdout.decode().strip()
p = run(["git", "-C", folder, "pull", "-q", "--ff-only"])
if p.returncode != 0:
raise UpdateError("Error pulling updates")
p = run(rpcmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Unable to determine new commit hash")
newhash = p.stdout.decode().strip()
if oldhash == newhash:
return name, REPO_SAME, None
else:
self.populate_list(name)
self.save_repos()
ret = {}
cmd = ['git', '-C', folder, 'diff', '--no-commit-id',
'--name-status', oldhash, newhash]
p = run(cmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Error in git diff")
raise CloningError()
self.populate_list(name)
return name, REPO_CLONE, None
else:
rpbcmd = ["git", "-C", folder, "rev-parse", "--abbrev-ref", "HEAD"]
p = run(rpbcmd, stdout=PIPE)
branch = p.stdout.decode().strip()
rpcmd = ["git", "-C", folder, "rev-parse", branch]
p = run(["git", "-C", folder, "reset", "--hard",
"origin/%s" % branch, "-q"])
if p.returncode != 0:
raise UpdateError("Error resetting to origin/%s" % branch)
p = run(rpcmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Unable to determine old commit hash")
oldhash = p.stdout.decode().strip()
p = run(["git", "-C", folder, "pull", "-q", "--ff-only"])
if p.returncode != 0:
raise UpdateError("Error pulling updates")
p = run(rpcmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Unable to determine new commit hash")
newhash = p.stdout.decode().strip()
if oldhash == newhash:
return name, REPO_SAME, None
else:
self.populate_list(name)
self.save_repos()
ret = {}
cmd = ['git', '-C', folder, 'diff', '--no-commit-id',
'--name-status', oldhash, newhash]
else:
p = run(["git", "clone", "-b", branch, url, folder])
if p.returncode != 0:
raise CloningError()
self.populate_list(name)
return name, REPO_CLONE, None
else:
rpbcmd = ["git", "-C", folder, "rev-parse", "--abbrev-ref", "HEAD"]
p = run(rpbcmd, stdout=PIPE)
branch = p.stdout.decode().strip()
rpcmd = ["git", "-C", folder, "rev-parse", branch]
p = run(["git", "-C", folder, "reset", "--hard",
"origin/%s" % branch, "-q"])
if p.returncode != 0:
raise UpdateError("Error resetting to origin/%s" % branch)
p = run(rpcmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Unable to determine old commit hash")
oldhash = p.stdout.decode().strip()
p = run(["git", "-C", folder, "pull", "-q", "--ff-only"])
if p.returncode != 0:
raise UpdateError("Error pulling updates")
p = run(rpcmd, stdout=PIPE)
if p.returncode != 0:
raise UpdateError("Unable to determine new commit hash")
newhash = p.stdout.decode().strip()
if oldhash == newhash:
return name, REPO_SAME, None
else:
self.populate_list(name)
self.save_repos()