Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'Create local git mirror for remote {r}'.format(r=git_repo),
self._log_file)
# Update existing git mirror
os.chdir(mirror_path)
gcall('git --no-pager gc --auto',
'Cleanup local mirror before update {r}'.format(r=git_repo),
self._log_file)
gcall('git --no-pager fetch --all --tags --prune',
'Update local git mirror from remote {r}'.format(r=git_repo),
self._log_file)
finally:
git_release_lock(lock_path, self._log_file)
# Resolve HEAD symbolic reference to identify the default branch
head = git('--no-pager', 'symbolic-ref', '--short', 'HEAD', _tty_out=False).strip()
# If revision is HEAD, replace it by the default branch
if revision == 'HEAD':
revision = head
# If revision is a commit hash, a full intermediate clone is required before getting a shallow clone
if self._is_commit_hash(revision):
# Create intermediate clone from the local git mirror, chdir into it and fetch all commits
source_path = get_intermediate_clone_path_from_module(self._app, module)
if os.path.exists(source_path):
gcall('chmod -R u+rwx {p}'.format(p=source_path), 'Update rights on previous intermediate clone', self._log_file)
gcall('rm -rf {p}'.format(p=source_path), 'Removing previous intermediate clone', self._log_file)
os.makedirs(source_path)
os.chdir(source_path)
gcall('du -hs .', 'Display current build directory disk usage', self._log_file)
gcall('git --no-pager init', 'Git init intermediate clone', self._log_file)
def clean_checkout(comm):
h, s, d = get_commit_vitals(comm)
if len(s) > 60:
s = s[:60] + "..."
s = s.split("\n")[0]
logger.info("CO: %s %s" % (comm, s))
sh.git("checkout", comm, _tty_out=False)
sh.git("clean", "-f")
@task
def dev():
sh.git("submodule", "init")
sh.git("submodule", "update")
def git_iter(sep, *args, **kwargs):
'Generator of chunks of stdout from given git command, delineated by sep character'
bufsize = 512
err = io.StringIO()
chunks = []
try:
loggit(*args)
for data in sh.git('--no-pager', *args, _decode_errors='replace', _out_bufsize=bufsize, _iter=True, _err=err, **kwargs):
while True:
i = data.find(sep)
if i < 0:
break
chunks.append(data[:i])
data = data[i+1:]
yield ''.join(chunks)
chunks.clear()
chunks.append(data)
except sh.ErrorReturnCode as e:
status('exit_code=%s' % e.exit_code)
r = ''.join(chunks)
if r:
yield r
def _checkout(self, branch):
try:
sh.git('clean', '-d', '--force')
sh.git('reset', '--hard')
sh.git('checkout', 'origin/' + branch)
return True
except sh.ErrorReturnCode:
LOG.error('Unable to checkout branch %(branch)s from repo '
'%(uri)s. Ignore it',
{'branch': branch, 'uri': self.repo['uri']},
exc_info=True)
return False
def get_git_matches(self, revision):
try:
return str(git('grep', '-i', '-e', '"({})"'.format(r'\|'.join(self.keywords)),
revision, _tty_out=False))
# return subprocess.check_output('git grep -i -e
# "(api\\|key\\|username\\|user\\|pw\\|password\\|pass\\|email\\|mail)" --
# `git ls-files | grep -v .html` | cat', shell=True).decode('utf8')
except sh.ErrorReturnCode_1:
return ''
except:
print('encoding error at revision: ', revision)
return ''
def pull_all(repos_dir):
""" need to change to work on project folders"""
for project in os.listdir(repos_dir):
logger.info('Performing pull on: {0}'.format(project))
try:
os.chdir(repos_dir)
sh.git('-C', project, 'pull')
except sh.ErrorReturnCode, e:
logger.error(e.stderr)
import yaml
import copy
from sh import git
from datetime import datetime, timedelta
from jinja2 import Environment, FileSystemLoader
from ghost_log import log
ROOT_PATH = os.path.dirname(os.path.realpath(__file__))
with open(os.path.dirname(os.path.realpath(__file__)) + '/config.yml', 'r') as conf_file:
config = yaml.load(conf_file)
try:
CURRENT_REVISION_NAME = git('symbolic-ref', '-q', '--short', 'HEAD', _tty_out=False).strip()
except:
try:
CURRENT_REVISION_NAME = git('describe', '--tags', '--exact-match', _tty_out=False).strip()
except:
CURRENT_REVISION_NAME = git('--no-pager', 'rev-parse', '--short', 'HEAD', _tty_out=False).strip()
try:
CURRENT_REVISION = dict(
current_revision=git('--no-pager', 'rev-parse', '--short', 'HEAD', _tty_out=False).strip(),
current_revision_date=git('log', '-1', '--format=%cD', _tty_out=False).strip(),
current_revision_name=CURRENT_REVISION_NAME.strip()
)
except:
CURRENT_REVISION = dict(
current_revision='unknown',
current_revision_date='unknown',
def remark_au_files():
"""Re-marks files as assumed unchanged."""
au = au_fp(b)
if not os.path.exists(au):
return
with io.open(au, mode='r', encoding=ENCODING) as f:
for fp in f:
git('update-index', '--assume-unchanged', fp.strip(),
_cwd=self.root)
os.remove(au)
version = self.latest_version
# Construct patch URL from current version to the target version
patch_url = TEMPLATE_PATCH_URL.format(self.version, version)
# Download and apply patch
with tempfile.NamedTemporaryFile() as patch:
output = sh.wget(patch_url, output_document='-').stdout
# Replace package name with local one
patch.write(output.replace(DEFAULT_PACKAGE_NAME, self.package))
# Flush to disk so patch can be applied
patch.flush()
try:
# Apply patch, ensure the exit code is clean
sh.git.apply(patch.name, directory=self.path)
self.config.read()
self.version = version
except sh.ErrorReturnCode:
print('There was a problem applying the patch.')
return
if options.get('update_deps'):
self.update_dependencies()
if options.get('migrate'):
self.migrate_database()