Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_git(self):
"""create a book from a git repo"""
if not os.path.exists(self.path):
LOG.info(("Creating book {0} from {1}, branch: {2}" +
"").format(self.path, self.git, self.branch))
git.clone(self.git, self.path)
else:
LOG.info("Book {0} already exists".format(self.path))
cwd = os.getcwd()
os.chdir(self.path)
if self.skiprepourlcheck:
remote_match_found = False
for remote in git("remote", "-v"):
remote_parts = remote.split()
if Url(remote_parts[1]) == Url(self.git):
remote_match_found = True
if remote_match_found:
LOG.debug('Found {0} in the list of remotes for {1}'.format(self.git, self.path))
else:
return render_template("failed.html")
# !! TODO
# some post-processing once the file is uploaded
else:
return render_template("failed.html")
except:
print "No file selected"
elif url != "":
try:
if url:
url_path = (url.rsplit('/', 1)[1]).rsplit('.', 1)[0]
# !! TODO try/except
if url.rsplit('.', 1)[1] == "git":
# !! TODO try/except - if the folder already exists
git.clone(url, path.join(app.config['UPLOAD_FOLDER'],
url_path))
# check for dockerfile at root
# check for dockerfile assuming repo is the services folder
if path.exists(path.join(app.config['UPLOAD_FOLDER'],
url_path,
"Dockerfile")) or path.exists(path.join(app.config['UPLOAD_FOLDER'],
url_path,
app.config['SERVICE_DICT']['dockerfile'])):
# check for existence of necessary files
missing_files = {}
for key,value in app.config['SERVICE_DICT'].items():
if not path.exists(path.join(app.config['UPLOAD_FOLDER'],
url_path,
value)):
missing_files[key] = value
if not os.path.exists(git_local_mirror):
log("Creating local mirror [{r}] for the first time".format(r=git_local_mirror), self._log_file)
os.makedirs(git_local_mirror)
os.chdir(git_local_mirror)
git.init(['--bare'])
git.remote(['add', self.name, provisioner_git_repo])
git.remote(['add', 'zabbix', zabbix_repo])
log("Fetching local mirror [{r}] remotes".format(r=git_local_mirror), self._log_file)
os.chdir(git_local_mirror)
git.fetch(['--all'])
finally:
git_release_lock(lock_path, self._log_file)
log("Cloning [{r}] repo with local mirror reference".format(r=provisioner_git_repo), self._log_file)
git.clone(['--reference', git_local_mirror, provisioner_git_repo, '-b', provisioner_git_revision, '--single-branch', self.local_repo_path + '/'])
if os.path.exists(self.local_repo_path + '/.gitmodules'):
os.chdir(self.local_repo_path)
log("Re-map submodules on local git mirror", self._log_file)
git_remap_submodule(self.local_repo_path, zabbix_repo, git_local_mirror, self._log_file)
log("Submodule init and update", self._log_file)
git.submodule('init')
git.submodule('update')
def __init__(self, args):
# self.reqfile = args.reqfile
self.port = args.port
self.force = args.force
self.privileged = args.privileged
self.clone = args.clone
self.test = args.test
# if clone, clone first
if self.clone:
from posixpath import basename
from urllib.parse import urlparse
dir = basename(urlparse(self.clone).path)
try:
sh.git.clone(self.clone)
except sh.ErrorReturnCode_128:
raise RuntimeError("Service '{}' already exists, can't clone".format(dir))
log.debug("Cloned service from {} into {}".format(self.clone, dir))
# update root dir
utils.change_root_dir(dir)
self.dir = utils.ROOT_DIR
super().__init__(args)
def manage_checkout(self):
''' Could probably be replaced with pygit '''
logger.info('manage ansible checkout for NWO updates')
if not os.path.exists(self.checkouts_dir):
os.makedirs(self.checkouts_dir)
if not os.path.exists(self.checkout_dir):
logger.info('git clone %s %s' % (self.url, self.checkout_dir))
git.clone(self.url, self.checkout_dir)
else:
logger.info('git fetch -a')
git.fetch('-a', _cwd=self.checkout_dir)
logger.info('git pull --rebase')
git.pull('--rebase', _cwd=self.checkout_dir)
url_candidate = urlparse(nmpi_job['code'])
logger.debug("Get code: %s %s", url_candidate.netloc, url_candidate.path)
if url_candidate.scheme and url_candidate.path.endswith((".tar.gz", ".zip", ".tgz")):
self._create_working_directory(job_desc.working_directory)
target = os.path.join(job_desc.working_directory, os.path.basename(url_candidate.path))
urlretrieve(nmpi_job['code'], target)
logger.info("Retrieved file from {} to local target {}".format(nmpi_job['code'], target))
if url_candidate.path.endswith((".tar.gz", ".tgz")):
tar("xzf", target, directory=job_desc.working_directory)
elif url_candidate.path.endswith(".zip"):
unzip(target, d=job_desc.working_directory)
else:
try:
# Check the "code" field for a git url (clone it into the workdir) or a script (create a file into the workdir)
# URL: use git clone
git.clone('--recursive', nmpi_job['code'], job_desc.working_directory)
logger.info("Cloned repository {}".format(nmpi_job['code']))
except (sh.ErrorReturnCode_128, sh.ErrorReturnCode):
# SCRIPT: create file (in the current directory)
logger.info("The code field appears to be a script.")
self._create_working_directory(job_desc.working_directory)
with codecs.open(job_desc.arguments[0], 'w', encoding='utf8') as job_main_script:
job_main_script.write(nmpi_job['code'])
def git_clone(git_repo_url, git_repo_dir):
try:
logger.info('Fetching from %s to %s', git_repo_url, git_repo_dir)
git.clone(git_repo_url, git_repo_dir, _out=sys.stdout,
_err=sys.stderr)
except ErrorReturnCode as e:
raise CommandError(e)
def getpackages(self, **kwargs):
repo = self.config_options.gitrepo_repo
distro_branch = self.config_options.distro
datadir = self.config_options.datadir
skip_dirs = self.config_options.skip_dirs
dev_mode = kwargs.get('dev_mode')
packages = []
gitpath = os.path.join(datadir, 'package_info')
if not os.path.exists(gitpath):
sh.git.clone(repo, gitpath)
if not dev_mode:
git = sh.git.bake(_cwd=gitpath, _tty_out=False, _timeout=3600)
git.fetch("origin")
# Use the the configured distro branch, fall back to master
# if it fails
try:
git.reset("--hard", "origin/%s" % distro_branch)
except Exception:
logger.info("Falling back to master")
git.reset("--hard", "origin/master")
for basepath in self.config_options.gitrepo_dirs:
path = basepath.strip('/')
packagepath = os.path.join(gitpath, path)
def _clone_repo(self, name, url, branch, key):
path = os.path.join(self._catalog_config.catalog_dir, name)
git.clone(url, path, '--branch', branch, _env=self.__git_env(key))