Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
shed_target_source = kwds.get("shed_target_source")
label_a = "_%s_" % (shed_target_source if shed_target_source else "workingdir")
shed_target = kwds.get("shed_target", "B")
if "/" in shed_target:
shed_target = "custom_shed"
label_b = "_%s_" % shed_target
mine = os.path.join(working, label_a)
other = os.path.join(working, label_b)
shed_context = get_shed_context(ctx, read_only=True, **kwds)
# In order to download the tarball, require repository ID...
repo_id = realized_repository.find_repository_id(ctx, shed_context)
if repo_id is None:
error("shed_diff: Repository [%s] does not exist in the targeted Tool Shed."
% realized_repository.name)
# $ diff README.rst not_a_file 2&>1 /dev/null; echo $?
# 2
return 2
info("Diffing repository [%s]" % realized_repository.name)
download_tarball(
ctx,
shed_context,
realized_repository,
destination=other,
clean=True,
destination_is_pattern=False,
**kwds
)
if shed_target_source:
new_kwds = kwds.copy()
def download_tarball(ctx, shed_context, realized_repository, **kwds):
repo_id = realized_repository.find_repository_id(ctx, shed_context)
if repo_id is None:
message = "Unable to find repository id, cannot download."
error(message)
raise Exception(message)
destination_pattern = kwds.get('destination', 'shed_download.tar.gz')
if kwds.get("destination_is_pattern", True):
destination = realized_repository.pattern_to_file_name(destination_pattern)
else:
destination = destination_pattern
to_directory = not destination.endswith("gz")
download_tar(shed_context.tsi, repo_id, destination, to_directory=to_directory)
if to_directory:
clean = kwds.get("clean", False)
if clean:
archival_file = os.path.join(destination, ".hg_archival.txt")
if os.path.exists(archival_file):
os.remove(archival_file)
def _create_shed_config(ctx, path, **kwds):
name = kwds.get("name") or path_to_repo_name(os.path.dirname(path))
name_invalid = validate_repo_name(name)
if name_invalid:
error(name_invalid)
return 1
owner = kwds.get("owner")
if owner is None:
owner = ctx.global_config.get("shed_username")
owner_invalid = validate_repo_owner(owner)
if owner_invalid:
error(owner_invalid)
return 1
description = kwds.get("description") or name
long_description = kwds.get("long_description")
remote_repository_url = kwds.get("remote_repository_url")
homepage_url = kwds.get("homepage_url")
categories = kwds.get("category", [])
config = dict(
name=name,
def not_specifing_dependent_option(x, y):
if kwds.get(x) and not kwds.get(y):
template = "Can only use the --%s option if also specifying --%s"
message = template % (x, y)
io.error(message)
return True
def not_specifing_dependent_option(x, y):
if kwds.get(x) and not kwds.get(y):
template = "Can only use the --%s option if also specifying --%s"
message = template % (x, y)
io.error(message)
return True
condarc_override = kwds.get("condarc", condarc_override_default)
use_local = kwds.get("conda_use_local", False)
shell_exec = shell if use_planemo_shell else None
conda_context = conda_util.CondaContext(conda_prefix=conda_prefix,
ensure_channels=ensure_channels,
condarc_override=condarc_override,
use_local=use_local,
shell_exec=shell_exec)
handle_auto_init = kwds.get("handle_auto_init", False)
if handle_auto_init and not conda_context.is_installed():
auto_init = kwds.get("conda_auto_init", True)
failed = True
if auto_init:
if conda_context.can_install_conda():
if conda_util.install_conda(conda_context):
error(MESSAGE_ERROR_FAILED_INSTALL)
else:
failed = False
else:
error(MESSAGE_ERROR_CANNOT_INSTALL)
else:
error(MESSAGE_ERROR_NOT_INSTALLING)
if failed:
raise ExitCodeException(EXIT_CODE_FAILED_DEPENDENCIES)
if handle_auto_init:
conda_context.ensure_conda_build_installed_if_needed()
return conda_context
def name_to_command(name):
"""Convert a subcommand name to the cli function for that command.
Command is defined by the method 'planemo.commands.cmd_:cli',
this method uses `__import__` to load and return that method.
"""
try:
if sys.version_info[0] == 2:
name = name.encode('ascii', 'replace')
mod_name = 'planemo.commands.cmd_' + name
mod = __import__(mod_name, None, None, ['cli'])
except ImportError as e:
error("Problem loading command %s, exception %s" % (name, e))
return
return mod.cli
def _create_shed_config(ctx, path, **kwds):
name = kwds.get("name") or path_to_repo_name(os.path.dirname(path))
name_invalid = validate_repo_name(name)
if name_invalid:
error(name_invalid)
return 1
owner = kwds.get("owner")
if owner is None:
owner = ctx.global_config.get("shed_username")
owner_invalid = validate_repo_owner(owner)
if owner_invalid:
error(owner_invalid)
return 1
description = kwds.get("description") or name
long_description = kwds.get("long_description")
remote_repository_url = kwds.get("remote_repository_url")
homepage_url = kwds.get("homepage_url")
categories = kwds.get("category", [])
config = dict(
name=name,
owner=owner,
description=description,
long_description=long_description,
remote_repository_url=remote_repository_url,
homepage_url=homepage_url,
categories=categories,
)
# Remove empty entries...