Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def v1_1to1_2(
doc: CommentedMap, loader: Loader, baseuri: str
) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument
"""Public updater for v1.1 to v1.2."""
doc = copy.deepcopy(doc)
upd = doc
if isinstance(upd, MutableMapping) and "$graph" in upd:
upd = cast(CommentedMap, upd["$graph"])
for proc in aslist(upd):
if "cwlVersion" in proc:
del proc["cwlVersion"]
return doc, "v1.2.0-dev1"
) -> Tuple[CommentedMap, str]: # pylint: disable=unused-argument
"""Public updater for v1.2.0-dev2 to v1.2.0-dev3."""
doc = copy.deepcopy(doc)
def update_pickvalue(t: CWLObjectType) -> None:
for step in cast(MutableSequence[CWLObjectType], t["steps"]):
for inp in cast(MutableSequence[CWLObjectType], step["in"]):
if "pickValue" in inp:
if inp["pickValue"] == "only_non_null":
inp["pickValue"] = "the_only_non_null"
visit_class(doc, "Workflow", update_pickvalue)
upd = doc
if isinstance(upd, MutableMapping) and "$graph" in upd:
upd = cast(CommentedMap, upd["$graph"])
for proc in aslist(upd):
if "cwlVersion" in proc:
del proc["cwlVersion"]
return (doc, "v1.2.0-dev3")
def check_format(
actual_file: Union[CWLObjectType, List[CWLObjectType]],
input_formats: Union[List[str], str],
ontology: Optional[Graph],
) -> None:
"""Confirm that the format present is valid for the allowed formats."""
for afile in aslist(actual_file):
if not afile:
continue
if "format" not in afile:
raise ValidationException(
"File has no 'format' defined: {}".format(json_dumps(afile, indent=4))
)
for inpf in aslist(input_formats):
if afile["format"] == inpf or formatSubclassOf(
afile["format"], inpf, ontology, set()
):
return
raise ValidationException(
"File has an incompatible format: {}".format(json_dumps(afile, indent=4))
)
globpatterns
)
)
elif not result and optional:
pass
elif isinstance(result, MutableSequence):
if len(result) > 1:
raise WorkflowException(
"Multiple matches for output item that is a single file."
)
else:
result = cast(CWLOutputType, result[0])
if "secondaryFiles" in schema:
with SourceLine(schema, "secondaryFiles", WorkflowException, debug):
for primary in aslist(result):
if isinstance(primary, MutableMapping):
primary.setdefault("secondaryFiles", [])
pathprefix = primary["path"][
0 : primary["path"].rindex(os.sep) + 1
]
for sf in aslist(schema["secondaryFiles"]):
if "required" in sf:
sf_required = builder.do_eval(
sf["required"], context=primary
)
else:
sf_required = False
if "$(" in sf["pattern"] or "${" in sf["pattern"]:
sfpath = builder.do_eval(
sf["pattern"], context=primary
_logger.warning(
SourceLine(ib, k).makeError(
"Will ignore field '{}' which is not valid in {} "
"inputBinding".format(k, t["class"])
)
)
del ib[k]
visit_class(doc, ("CommandLineTool", "Workflow"), rewrite_requirements)
visit_class(doc, ("ExpressionTool", "Workflow"), fix_inputBinding)
visit_field(doc, "secondaryFiles", partial(update_secondaryFiles, top=True))
upd = doc
if isinstance(upd, MutableMapping) and "$graph" in upd:
upd = cast(CommentedMap, upd["$graph"])
for proc in aslist(upd):
proc.setdefault("hints", CommentedSeq())
proc["hints"].insert(
0, CommentedMap([("class", "NetworkAccess"), ("networkAccess", True)])
)
proc["hints"].insert(
0,
CommentedMap(
[("class", "LoadListingRequirement"), ("loadListing", "deep_listing")]
),
)
if "cwlVersion" in proc:
del proc["cwlVersion"]
return (doc, "v1.1")
alloutputs_fufilled = False
while not alloutputs_fufilled:
# Iteratively go over the workflow steps, scheduling jobs as their
# dependencies can be fufilled by upstream workflow inputs or
# step outputs. Loop exits when the workflow outputs
# are satisfied.
alloutputs_fufilled = True
for step in self.cwlwf.steps:
if step.tool["id"] not in jobs:
stepinputs_fufilled = True
for inp in step.tool["inputs"]:
if "source" in inp:
for s in aslist(inp["source"]):
if s not in promises:
stepinputs_fufilled = False
if stepinputs_fufilled:
jobobj = {}
for inp in step.tool["inputs"]:
key = shortname(inp["id"])
if "source" in inp:
inpSource = inp["source"]
if inp.get("linkMerge") \
or len(aslist(inp["source"])) > 1:
jobobj[key] =\
_link_merge_source(promises, inp, inpSource)
else:
if isinstance(inpSource, MutableSequence):
# It seems that an input source with a
if (not isinstance(
promises[s], (CWLJobWrapper, CWLGather)
) and not promises[s].hasChild(wfjob)):
promises[s].addChild(wfjob)
connected = True
if not connected:
# the workflow step has default inputs only & isn't
# connected to other jobs, so add it as child of
# this workflow.
self.addChild(wfjob)
for out in step.tool["outputs"]:
promises[out["id"]] = followOn
for inp in step.tool["inputs"]:
for source in aslist(inp.get("source", [])):
if source not in promises:
alloutputs_fufilled = False
# may need a test
for out in self.cwlwf.tool["outputs"]:
if "source" in out:
if out["source"] not in promises:
alloutputs_fufilled = False
outobj = {}
for out in self.cwlwf.tool["outputs"]:
key = shortname(out["id"])
if out.get("linkMerge") or len(aslist(out["outputSource"])) > 1:
outobj[key] = _link_merge_source(promises, out, out["outputSource"])
else:
# A CommentedSeq of length one still appears here rarely -
if "secondaryFiles" not in datum:
datum["secondaryFiles"] = []
for sf in aslist(schema["secondaryFiles"]):
if "required" in sf:
sf_required = self.do_eval(sf["required"], context=datum)
else:
sf_required = True
if "$(" in sf["pattern"] or "${" in sf["pattern"]:
sfpath = self.do_eval(sf["pattern"], context=datum)
else:
sfpath = substitute(
cast(str, datum["basename"]), sf["pattern"]
)
for sfname in aslist(sfpath):
if not sfname:
continue
found = False
if isinstance(sfname, str):
d_location = cast(str, datum["location"])
if "/" in d_location:
sf_location = (
d_location[0 : d_location.rindex("/") + 1]
+ sfname
)
else:
sf_location = d_location + sfname
sfbasename = sfname
elif isinstance(sfname, MutableMapping):
sf_location = sfname["location"]
sf_required = builder.do_eval(
sf["required"], context=primary
)
else:
sf_required = False
if "$(" in sf["pattern"] or "${" in sf["pattern"]:
sfpath = builder.do_eval(
sf["pattern"], context=primary
)
else:
sfpath = substitute(
primary["basename"], sf["pattern"]
)
for sfitem in aslist(sfpath):
if not sfitem:
continue
if isinstance(sfitem, str):
sfitem = {"path": pathprefix + sfitem}
if (
not fs_access.exists(sfitem["path"])
and sf_required
):
raise WorkflowException(
"Missing required secondary file '%s'"
% (sfitem["path"])
)
if "path" in sfitem and "location" not in sfitem:
revmap(sfitem)
if fs_access.isfile(sfitem["location"]):
sfitem["class"] = "File"
def get_subgraph(roots: MutableSequence[str], tool: Workflow) -> CommentedMap:
if tool.tool["class"] != "Workflow":
raise Exception("Can only extract subgraph from workflow")
nodes = {} # type: Dict[str, Node]
for inp in tool.tool["inputs"]:
declare_node(nodes, inp["id"], INPUT)
for out in tool.tool["outputs"]:
declare_node(nodes, out["id"], OUTPUT)
for i in aslist(out.get("outputSource", [])):
# source is upstream from output (dependency)
nodes[out["id"]].up.append(i)
# output is downstream from source
declare_node(nodes, i, None)
nodes[i].down.append(out["id"])
for st in tool.tool["steps"]:
step = declare_node(nodes, st["id"], STEP)
for i in st["in"]:
if "source" not in i:
continue
for src in aslist(i["source"]):
# source is upstream from step (dependency)
step.up.append(src)
# step is downstream from source
declare_node(nodes, src, None)