Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def mk_tool(
schema: Names,
opts: List[str],
reqs: Optional[List[CommentedMap]] = None,
hints: Optional[List[CommentedMap]] = None,
) -> Tuple[LoadingContext, RuntimeContext, CommentedMap]:
tool = basetool.copy()
if reqs is not None:
tool["requirements"] = CommentedSeq(reqs)
if hints is not None:
tool["hints"] = CommentedSeq(hints)
args = cwltool.argparser.arg_parser().parse_args(opts)
args.enable_ext = True
rc = RuntimeContext(vars(args))
lc = cwltool.main.setup_loadingContext(None, rc, args)
lc.avsc_names = schema
return lc, rc, tool
def test_factory_partial_output() -> None:
runtime_context = RuntimeContext()
runtime_context.on_error = "continue"
factory = cwltool.factory.Factory(runtime_context=runtime_context)
with pytest.raises(cwltool.factory.WorkflowStatus) as err_info:
factory.make(get_data("tests/wf/wffail.cwl"))()
err = err_info.value
assert (
err.out["out1"]["checksum"] == "sha1$e5fa44f2b31c1fb553b6021e7360d07d5d91ff5e"
)
assert err.out["out2"] is None
def test_factory_redefined_args() -> None:
runtime_context = RuntimeContext()
runtime_context.use_container = False
runtime_context.on_error = "continue"
factory = cwltool.factory.Factory(runtime_context=runtime_context)
assert factory.runtime_context.use_container is False
assert factory.runtime_context.on_error == "continue"
def test_compute_checksum() -> None:
runtime_context = RuntimeContext()
runtime_context.compute_checksum = True
runtime_context.use_container = onWindows()
factory = cwltool.factory.Factory(runtime_context=runtime_context)
echo = factory.make(get_data("tests/wf/cat-tool.cwl"))
output = echo(
file1={"class": "File", "location": get_data("tests/wf/whale.txt")},
reverse=False,
)
assert isinstance(output, dict)
result = output["output"]
assert isinstance(result, dict)
assert result["checksum"] == "sha1$327fc7aedf4f6b69a42a7c8b808dc5a7aff61376"
def test_check_version() -> None:
"""
It is permitted to load without updating, but not execute.
Attempting to execute without updating to the internal version should raise an error.
"""
joborder = {"inp": "abc"} # type: CWLObjectType
loadingContext = LoadingContext({"do_update": True})
tool = load_tool(get_data("tests/echo.cwl"), loadingContext)
for j in tool.job(joborder, None, RuntimeContext()):
pass
loadingContext = LoadingContext({"do_update": False})
tool = load_tool(get_data("tests/echo.cwl"), loadingContext)
with pytest.raises(WorkflowException):
for j in tool.job(joborder, None, RuntimeContext()):
pass
kwargs = self.dag.default_args
tmp_folder = collected_outputs["tmp_folder"]
output_folder = collected_outputs["output_folder"]
kwargs['outdir'] = tempfile.mkdtemp(dir=tmp_folder, prefix="step_tmp_")
kwargs['tmpdir_prefix'] = tempfile.mkdtemp(dir=tmp_folder, prefix="cwl_tmp_")
kwargs['tmp_outdir_prefix'] = os.path.join(tmp_folder, "cwl_outdir_tmp_")
kwargs['rm_tmpdir'] = False
kwargs["basedir"] = os.path.abspath(os.path.dirname(self.dag.default_args["job_data"]["path"]))
logger = logging.getLogger("cwltool")
sys.stdout = StreamLogWriterUpdated(logger, logging.INFO)
sys.stderr = StreamLogWriterUpdated(logger, logging.WARN)
executor = cwltool.executors.SingleJobExecutor()
runtimeContext = RuntimeContext(kwargs)
runtimeContext.make_fs_access = getdefault(runtimeContext.make_fs_access, cwltool.stdfsaccess.StdFsAccess)
for inp in self.cwl_step.tool["inputs"]:
if inp.get("not_connected"):
del job[shortname(inp["id"].split("/")[-1])]
(output, status) = executor(self.cwl_step.embedded_tool,
job,
runtimeContext,
logger=logger)
if not output and status == "permanentFail":
raise ValueError
logging.debug('Embedded tool outputs: \n{}'.format(json.dumps(output, indent=4)))
job = _post_scatter_eval(jobobj, self.cwl_step)
_logger.info('{0}: Final job data: \n {1}'.format(self.task_id, json.dumps(job, indent=4)))
_d_args['outdir'] = tempfile.mkdtemp(prefix=os.path.join(self.outdir, "step_tmp"))
_d_args['tmpdir_prefix'] = os.path.join(_d_args['outdir'], 'cwl_tmp_')
_d_args['tmp_outdir_prefix'] = os.path.join(_d_args['outdir'], 'cwl_outdir_')
_d_args["record_container_id"] = True
_d_args["cidfile_dir"] = _d_args['outdir']
_d_args["cidfile_prefix"] = self.task_id
_logger.debug(
'{0}: Runtime context: \n {1}'.format(self, _d_args))
executor = SingleJobExecutor()
runtimeContext = RuntimeContext(_d_args)
runtimeContext.make_fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess)
for inp in self.cwl_step.tool["inputs"]:
if inp.get("not_connected"):
del job[shortname(inp["id"].split("/")[-1])]
_stderr = sys.stderr
sys.stderr = sys.__stderr__
(output, status) = executor(self.cwl_step.embedded_tool if it_is_workflow else self.cwl_step,
job,
runtimeContext,
logger=_logger)
sys.stderr = _stderr
if not output and status == "permanentFail":
raise ValueError
outdir = os.path.abspath(options.outdir)
tmp_outdir_prefix = os.path.abspath(options.tmp_outdir_prefix)
fileindex = {}
existing = {}
conf_file = getattr(options,
"beta_dependency_resolvers_configuration", None)
use_conda_dependencies = getattr(options, "beta_conda_dependencies", None)
job_script_provider = None
if conf_file or use_conda_dependencies:
dependencies_configuration = DependenciesConfiguration(options)
job_script_provider = dependencies_configuration
options.default_container = None
runtime_context = cwltool.context.RuntimeContext(vars(options))
runtime_context.find_default_container = functools.partial(
find_default_container, options)
runtime_context.workdir = workdir
runtime_context.move_outputs = "leave"
runtime_context.rm_tmpdir = False
loading_context = cwltool.context.LoadingContext(vars(options))
if options.provenance:
research_obj = cwltool.provenance.ResearchObject(
temp_prefix_ro=options.tmp_outdir_prefix, orcid=options.orcid,
full_name=options.cwl_full_name,
fsaccess=runtime_context.make_fs_access(''))
runtime_context.research_obj = research_obj
with Toil(options) as toil:
if options.restart:
def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the RuntimeContext from the kwargs."""
select_resources_callable = Callable[ # pylint: disable=unused-variable
[Dict[str, Union[int, float]], RuntimeContext], Dict[str, Union[int, float]]
]
self.user_space_docker_cmd = "" # type: Optional[str]
self.secret_store = None # type: Optional[SecretStore]
self.no_read_only = False # type: bool
self.custom_net = "" # type: Optional[str]
self.no_match_user = False # type: bool
self.preserve_environment = "" # type: Optional[Iterable[str]]
self.preserve_entire_environment = False # type: bool
self.use_container = True # type: bool
self.force_docker_pull = False # type: bool
self.tmp_outdir_prefix = DEFAULT_TMP_PREFIX # type: str
self.tmpdir_prefix = DEFAULT_TMP_PREFIX # type: str
self.tmpdir = "" # type: str
self.rm_tmpdir = True # type: bool
self.pull_image = True # type: bool