Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def check_for_abstract_op(tool: CWLObjectType) -> None:
if tool["class"] == "Operation":
raise SourceLine(tool, "class", WorkflowException).makeError(
"Workflow has unrunnable abstract Operation"
)
process.visit(check_for_abstract_op)
finaloutdir = None # Type: Optional[str]
original_outdir = runtime_context.outdir
if isinstance(original_outdir, str):
finaloutdir = os.path.abspath(original_outdir)
runtime_context = runtime_context.copy()
outdir = tempfile.mkdtemp(
prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX)
)
self.output_dirs.add(outdir)
runtime_context.outdir = outdir
runtime_context.mutation_manager = MutationManager()
runtime_context.toplevel = True
runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())
job_reqs = None # type: Optional[List[CWLObjectType]]
if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
if (
process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion")
== "v1.0"
):
raise WorkflowException(
"`cwl:requirements` in the input object is not part of CWL "
"v1.0. You can adjust to use `cwltool:overrides` instead; or you "
args: argparse.Namespace,
) -> LoadingContext:
if loadingContext is None:
loadingContext = LoadingContext(vars(args))
else:
loadingContext = loadingContext.copy()
loadingContext.loader = default_loader(
loadingContext.fetcher_constructor,
enable_dev=args.enable_dev,
doc_cache=args.doc_cache,
)
loadingContext.research_obj = runtimeContext.research_obj
loadingContext.disable_js_validation = args.disable_js_validation or (
not args.do_validate
)
loadingContext.construct_tool_object = getdefault(
loadingContext.construct_tool_object, workflow.default_make_tool
)
loadingContext.resolver = getdefault(loadingContext.resolver, tool_resolver)
if loadingContext.do_update is None:
loadingContext.do_update = not (args.pack or args.print_subgraph)
return loadingContext
cmd = [] # type: List[str]
for b in builder.bindings:
arg = builder.generate_arg(b)
if b.get("shellQuote", True):
arg = [shellescape.quote(a) for a in aslist(arg)]
cmd.extend(aslist(arg))
j.command_line = ["/bin/sh", "-c", " ".join(cmd)]
else:
j.command_line = flatten(list(map(builder.generate_arg, builder.bindings)))
j.pathmapper = builder.pathmapper
j.collect_outputs = partial(
self.collect_output_ports,
self.tool["outputs"],
builder,
compute_checksum=getdefault(runtimeContext.compute_checksum, True),
jobname=jobname,
readers=readers,
)
j.output_callback = output_callbacks
mpi, _ = self.get_requirement(MPIRequirementName)
if mpi is not None:
np = cast( # From the schema for MPIRequirement.processes
Union[int, str],
mpi.get("processes", runtimeContext.mpi_config.default_nproc),
)
if isinstance(np, str):
tmp = builder.do_eval(np)
if not isinstance(tmp, int):
raise TypeError(
loadingContext.requirements = copy.deepcopy(
getdefault(loadingContext.requirements, [])
)
assert loadingContext.requirements is not None # nosec
loadingContext.requirements.extend(toolpath_object.get("requirements", []))
loadingContext.requirements.extend(
cast(
List[CWLObjectType],
get_overrides(
getdefault(loadingContext.overrides_list, []), self.id
).get("requirements", []),
)
)
hints = copy.deepcopy(getdefault(loadingContext.hints, []))
hints.extend(toolpath_object.get("hints", []))
loadingContext.hints = hints
try:
if isinstance(toolpath_object["run"], CommentedMap):
self.embedded_tool = loadingContext.construct_tool_object(
toolpath_object["run"], loadingContext
) # type: Process
else:
loadingContext.metadata = {}
self.embedded_tool = load_tool(toolpath_object["run"], loadingContext)
except ValidationException as vexc:
if loadingContext.debug:
_logger.exception("Validation exception")
raise WorkflowException(
"Tool definition %s failed validation:\n%s"
if sys.platform == "darwin":
default_mac_path = "/private/tmp/docker_tmp"
if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX:
runtimeContext.tmp_outdir_prefix = default_mac_path
if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX:
runtimeContext.tmpdir_prefix = default_mac_path
if check_working_directories(runtimeContext) is not None:
return 1
if args.cachedir:
if args.move_outputs == "move":
runtimeContext.move_outputs = "copy"
runtimeContext.tmp_outdir_prefix = args.cachedir
runtimeContext.secret_store = getdefault(
runtimeContext.secret_store, SecretStore()
)
runtimeContext.make_fs_access = getdefault(
runtimeContext.make_fs_access, StdFsAccess
)
if not executor:
if args.parallel:
temp_executor = MultithreadedJobExecutor()
runtimeContext.select_resources = temp_executor.select_resources
real_executor = temp_executor # type: JobExecutor
else:
real_executor = SingleJobExecutor()
else:
real_executor = executor
# statically validate data links instead of doing it at runtime.
workflow_inputs = self.tool["inputs"]
workflow_outputs = self.tool["outputs"]
step_inputs = [] # type: List[CWLObjectType]
step_outputs = [] # type: List[CWLObjectType]
param_to_step = {} # type: Dict[str, CWLObjectType]
for step in self.steps:
step_inputs.extend(step.tool["inputs"])
step_outputs.extend(step.tool["outputs"])
for s in step.tool["inputs"]:
param_to_step[s["id"]] = step.tool
for s in step.tool["outputs"]:
param_to_step[s["id"]] = step.tool
if getdefault(loadingContext.do_validate, True):
static_checker(
workflow_inputs,
workflow_outputs,
step_inputs,
step_outputs,
param_to_step,
)
raise WorkflowException(
SourceLine(docker_req).makeError(
"Docker image must be specified as 'dockerImageId' or "
"'dockerPull' when using user space implementations of "
"Docker"
)
)
else:
try:
if docker_req is not None and runtimeContext.use_container:
img_id = str(
self.get_from_requirements(
docker_req,
runtimeContext.pull_image,
getdefault(runtimeContext.force_docker_pull, False),
getdefault(
runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX
),
)
)
if img_id is None:
if self.builder.find_default_container:
default_container = self.builder.find_default_container()
if default_container:
img_id = str(default_container)
if (
docker_req is not None
and img_id is None
and runtimeContext.use_container
):
raise Exception("Docker image not available")
else:
outdir = cast(
str,
docker_req.get("dockerOutputDirectory")
or runtime_context.docker_outdir
or random_outdir(),
)
elif default_docker is not None:
outdir = runtime_context.docker_outdir or random_outdir()
tmpdir = runtime_context.docker_tmpdir or "/tmp" # nosec
stagedir = runtime_context.docker_stagedir or "/var/lib/cwl"
else:
outdir = fs_access.realpath(
runtime_context.outdir
or tempfile.mkdtemp(
prefix=getdefault(
runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX
)
)
)
if self.tool["class"] != "Workflow":
tmpdir = fs_access.realpath(
runtime_context.tmpdir or tempfile.mkdtemp()
)
stagedir = fs_access.realpath(
runtime_context.stagedir or tempfile.mkdtemp()
)
cwl_version = cast(
str,
self.metadata.get("http://commonwl.org/cwltool#original_cwlVersion", None),
)
SCHEMA_DIR = cast(
CWLObjectType,
SCHEMA_CACHE["v1.0"][3].idx["https://w3id.org/cwl/cwl#Directory"],
)
self.names = make_avro_schema([SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY], Loader({}))
self.tool = toolpath_object
self.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
self.requirements.extend(self.tool.get("requirements", []))
if "id" not in self.tool:
self.tool["id"] = "_:" + str(uuid.uuid4())
self.requirements.extend(
cast(
List[CWLObjectType],
get_overrides(
getdefault(loadingContext.overrides_list, []), self.tool["id"]
).get("requirements", []),
)
)
self.hints = copy.deepcopy(getdefault(loadingContext.hints, []))
self.hints.extend(self.tool.get("hints", []))
# Versions of requirements and hints which aren't mutated.
self.original_requirements = copy.deepcopy(self.requirements)
self.original_hints = copy.deepcopy(self.hints)
self.doc_loader = loadingContext.loader
self.doc_schema = loadingContext.avsc_names
self.formatgraph = None # type: Optional[Graph]
if self.doc_loader is not None:
self.formatgraph = self.doc_loader.graph
checkRequirements(self.tool, supportedProcessRequirements)