Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
):
num_tests = test_results.num_tests
num_problems = test_results.num_problems
if num_tests == 0:
warn(NO_TESTS_MESSAGE)
return
if num_problems == 0:
info(ALL_TESTS_PASSED_MESSAGE % num_tests)
if num_problems:
html_report_file = test_results.output_html_path
message_args = (num_problems, num_tests, html_report_file)
message = PROBLEM_COUNT_MESSAGE % message_args
warn(message)
for testcase_el in test_results.xunit_testcase_elements:
structured_data_tests = test_results.structured_data_tests
__summarize_test_case(structured_data_tests, testcase_el, **kwds)
def __handle_summary(
test_results,
**kwds
):
summary_style = kwds.get("summary")
if summary_style == "none":
return
if test_results.has_details:
__summarize_tests_full(
test_results,
**kwds
)
else:
if test_results.exit_code:
warn(GENERIC_PROBLEMS_MESSAGE % test_results.output_html_path)
else:
info(GENERIC_TESTS_PASSED_MESSAGE)
def __summarize_tests_full(
test_results,
**kwds
):
num_tests = test_results.num_tests
num_problems = test_results.num_problems
if num_tests == 0:
warn(NO_TESTS_MESSAGE)
return
if num_problems == 0:
info(ALL_TESTS_PASSED_MESSAGE % num_tests)
if num_problems:
html_report_file = test_results.output_html_path
message_args = (num_problems, num_tests, html_report_file)
message = PROBLEM_COUNT_MESSAGE % message_args
warn(message)
for testcase_el in test_results.xunit_testcase_elements:
structured_data_tests = test_results.structured_data_tests
__summarize_test_case(structured_data_tests, testcase_el, **kwds)
\b
% planemo run cat1-tool.cwl cat-job.json
"""
path = uri_to_path(ctx, uri)
# TODO: convert UI to runnable and do a better test of cwl.
is_cwl = path.endswith(".cwl")
kwds["cwl"] = is_cwl
if kwds.get("engine", None) is None:
kwds["engine"] = "galaxy" if not is_cwl else "cwltool"
with engine_context(ctx, **kwds) as engine:
run_result = engine.run(path, job_path)
if not run_result.was_successful:
warn("Run failed [%s]" % unicodify(run_result))
ctx.exit(1)
outputs_dict = run_result.outputs_dict
print(outputs_dict)
output_json = kwds.get("output_json", None)
if output_json:
with open(output_json, "w") as f:
json.dump(outputs_dict, f)
return 0
def cli(ctx, **kwds):
"""Download and install conda.
This will download conda for managing dependencies for your platform
using the appropriate Miniconda installer.
By running this command, you are agreeing to the terms of the conda
license a 3-clause BSD 3 license. Please review full license at
http://docs.continuum.io/anaconda/eula.
Planemo will print a warning and terminate with an exit code of 7
if Conda is already installed.
"""
conda_context = build_conda_context(ctx, **kwds)
if conda_context.is_conda_installed():
warn(MESSAGE_ERROR_ALREADY_EXISTS % conda_context.conda_exec)
exit = EXIT_CODE_ALREADY_EXISTS
else:
exit = conda_util.install_conda(conda_context=conda_context, force_conda_build=True)
if exit:
warn(MESSAGE_ERROR_FAILED % conda_context.conda_exec)
else:
info(MESSAGE_INSTALL_OKAY % conda_context.conda_exec)
ctx.exit(exit)
if kwds.get("check_diff", False):
is_diff = diff_repo(ctx, realized_repository, **kwds) != 0
if not is_diff:
name = realized_repository.name
info("Repository [%s] not different, skipping upload." % name)
return 0
# TODO: support updating repo information if it changes in the config file
try:
shed_context.tsi.repositories.update_repository(
str(repo_id), tar_path, **update_kwds
)
except Exception as e:
if isinstance(e, bioblend.ConnectionError) and e.status_code == 400 and \
'"No changes to repository."' in e.body:
warn("Repository %s was not updated because there were no changes" % realized_repository.name)
return 0
message = api_exception_to_message(e)
error("Could not update %s" % realized_repository.name)
error(message)
return -1
info("Repository %s updated successfully." % realized_repository.name)
return 0