Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_local_repo_path(bw_repo_path, repo_name):
"""
From the given BundleWrap repo, get the filesystem path to the git
repo associated with the given internal repo name.
"""
repo_map_path = join(bw_repo_path, REPO_MAP_FILENAME)
if not isfile(repo_map_path):
io.stderr(_("missing repo map for git_deploy at {}").format(repo_map_path))
io.stderr(_("you must create this file with the following format:"))
io.stderr(_(" : "
""))
io.stderr(_("since the path is local, you should also add the "
"{} file to your gitignore").format(REPO_MAP_FILENAME))
raise RepositoryError(_("missing repo map for git_deploy"))
with open(join(bw_repo_path, REPO_MAP_FILENAME)) as f:
repo_map = f.readlines()
for line in repo_map:
if not line.strip() or line.startswith("#"):
continue
try:
repo, path = line.split(":", 1)
except:
raise RepositoryError(_("unable to parse line from {path}: '{line}'").format(
line=line,
path=repo_map_path,
))
except NoSuchRepository:
if path == dirname(path):
io.stderr(_(
"{x} {path} "
"is not a BundleWrap repository."
).format(path=quote(abspath(pargs.repo_path)), x=red("!!!")))
io.deactivate()
exit(1)
else:
path = dirname(path)
except MissingRepoDependency as exc:
io.stderr(str(exc))
io.deactivate()
exit(1)
except Exception:
io.stderr(format_exc())
io.deactivate()
exit(1)
# convert all string args into text
text_pargs = {key: force_text(value) for key, value in vars(pargs).items()}
try:
pargs.func(repo, text_pargs)
finally:
io.deactivate()
if pargs.profile:
profile.disable()
profile.dump_stats(pargs.profile)
def handle_exception(task_id, exception, traceback):
io.progress_advance()
msg = "{} {}".format(bold(task_id), exception)
io.stderr(traceback)
io.stderr(repr(exception))
io.stderr("{} {}".format(red("!"), msg))
errors.append(msg)
for file in manifest['provides']:
target_path = join(self.path, file)
url = "{}/{}/{}".format(self.base_url, plugin, file)
download(url, target_path)
# make file read-only to discourage users from editing them
# which will block future updates of the plugin
chmod(target_path, S_IREAD | S_IRGRP | S_IROTH)
# check for files that have been removed in the new version
for file, db_checksum in self.plugin_db[plugin]['files'].items():
if file not in manifest['provides']:
file_path = join(self.path, file)
current_checksum = hash_local_file(file_path)
if db_checksum != current_checksum and not force:
io.stderr(_(
"not removing '{path}' because it has been modified since installation"
).format(path=file_path))
continue
remove(file_path)
self.record_as_installed(plugin, manifest)
return (old_version, new_version)
def handle_exception(task_id, exception, traceback):
msg = _("{x} {node} {msg}").format(
node=bold(task_id),
msg=exception,
x=red("!"),
)
if isinstance(exception, ItemDependencyLoop):
for line in explain_item_dependency_loop(exception, task_id):
io.stderr(line)
errors.append(line)
elif isinstance(exception, GracefulApplyException):
errors.append(msg)
else:
io.stderr(traceback)
io.stderr(repr(exception))
io.stderr(msg)
errors.append(msg)
def handle_exception(task_id, exception, traceback):
msg = "{}: {}".format(task_id, exception)
io.stderr(traceback)
io.stderr(repr(exception))
io.stderr(msg)
errors.append(msg)
):
# skipped for "unless" or "not triggered", don't output those
return
formatted_result = format_item_result(
status_code,
node.name,
item.bundle.name if item.bundle else "", # dummy items don't have bundles
item.id,
interactive=interactive,
details=details,
)
if formatted_result is not None:
if status_code == Item.STATUS_FAILED:
io.stderr(formatted_result)
if item._command_results:
io.stderr(format_item_command_results(item._command_results))
# free up memory
del item._command_results
else:
io.stdout(formatted_result)
io.debug(_("running '{}' in {}").format(
" ".join(cmdline),
repo_dir,
))
git_process = Popen(
cmdline,
cwd=repo_dir,
preexec_fn=setpgrp,
stderr=PIPE,
stdout=PIPE,
)
stdout, stderr = git_process.communicate()
# FIXME integrate this into Item._command_results
if git_process.returncode != 0:
io.stderr(_("failed command: {}").format(" ".join(cmdline)))
io.stderr(_("stdout:\n{}").format(stdout))
io.stderr(_("stderr:\n{}").format(stderr))
raise RuntimeError(_("`git {command}` failed in {dir}").format(
command=cmdline[1],
dir=repo_dir,
))
return stdout.decode('utf-8').strip()
def handle_exception(task_id, exception, traceback):
msg = "{}: {}".format(task_id, exception)
io.stderr(traceback)
io.stderr(repr(exception))
io.stderr(msg)
errors.append(msg)