Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pack_input_named_name() -> None:
loadingContext, workflowobj, uri = fetch_document(
get_data("tests/wf/trick_revsort.cwl")
)
loadingContext.do_update = False
loadingContext, uri = resolve_and_validate_document(
loadingContext, workflowobj, uri
)
loader = loadingContext.loader
assert loader
processobj = loader.resolve_ref(uri)[0]
with open(get_data("tests/wf/expect_trick_packed.cwl")) as packed_file:
expect_packed = yaml.round_trip_load(packed_file)
packed = cwltool.pack.pack(loadingContext, uri)
adjustFileObjs(
packed, partial(make_relative, os.path.abspath(get_data("tests/wf")))
def test_pack_single_tool() -> None:
loadingContext, workflowobj, uri = fetch_document(
get_data("tests/wf/formattest.cwl")
)
loadingContext.do_update = False
loadingContext, uri = resolve_and_validate_document(
loadingContext, workflowobj, uri
)
loader = loadingContext.loader
assert loader
processobj = loader.resolve_ref(uri)[0]
packed = cwltool.pack.pack(loadingContext, uri)
assert "$schemas" in packed
def test_packed_workflow_execution(
wf_path: str, job_path: str, namespaced: bool, tmpdir: py.path.local
) -> None:
loadingContext = LoadingContext()
loadingContext.resolver = tool_resolver
loadingContext, workflowobj, uri = fetch_document(get_data(wf_path), loadingContext)
loadingContext.do_update = False
loadingContext, uri = resolve_and_validate_document(
loadingContext, workflowobj, uri
)
loader = loadingContext.loader
assert loader
processobj = loader.resolve_ref(uri)[0]
packed = json.loads(print_pack(loadingContext, uri))
assert not namespaced or "$namespaces" in packed
wf_packed_handle, wf_packed_path = tempfile.mkstemp()
with open(wf_packed_path, "w") as temp_file:
json.dump(packed, temp_file)
normal_output = StringIO()
def _pack_idempotently(document: str) -> None:
loadingContext, workflowobj, uri = fetch_document(get_data(document))
loadingContext.do_update = False
loadingContext, uri = resolve_and_validate_document(
loadingContext, workflowobj, uri
)
loader = loadingContext.loader
assert loader
processobj = loader.resolve_ref(uri)[0]
# generate pack output dict
packed_text = print_pack(loadingContext, uri)
packed = json.loads(packed_text)
tmp = NamedTemporaryFile(mode="w", delete=False)
try:
tmp.write(packed_text)
tmp.flush()
)
loader = loadingContext.loader
assert loader
processobj = loader.resolve_ref(uri)[0]
# generate pack output dict
packed_text = print_pack(loadingContext, uri)
packed = json.loads(packed_text)
tmp = NamedTemporaryFile(mode="w", delete=False)
try:
tmp.write(packed_text)
tmp.flush()
tmp.close()
loadingContext, workflowobj, uri2 = fetch_document(tmp.name)
loadingContext.do_update = False
loadingContext, uri2 = resolve_and_validate_document(
loadingContext, workflowobj, uri2
)
loader2 = loadingContext.loader
assert loader2
processobj = loader2.resolve_ref(uri2)[0]
# generate pack output dict
packed_text = print_pack(loadingContext, uri2)
double_packed = json.loads(packed_text)
finally:
os.remove(tmp.name)
assert uri != uri2
assert packed == double_packed
loading_context.construct_tool_object = toil_make_tool
loading_context.resolver = cwltool.resolver.tool_resolver
loading_context.strict = not options.not_strict
options.workflow = options.cwltool
options.job_order = options.cwljob
uri, tool_file_uri = cwltool.load_tool.resolve_tool_uri(
options.cwltool, loading_context.resolver,
loading_context.fetcher_constructor)
options.tool_help = None
options.debug = options.logLevel == "DEBUG"
job_order_object, options.basedir, jobloader = \
cwltool.main.load_job_order(
options, sys.stdin, loading_context.fetcher_constructor,
loading_context.overrides_list, tool_file_uri)
loading_context, workflowobj, uri = cwltool.load_tool.fetch_document(uri, loading_context)
loading_context, uri = cwltool.load_tool.resolve_and_validate_document(loading_context, workflowobj, uri)
loading_context.overrides_list.extend(loading_context.metadata.get("cwltool:overrides", []))
document_loader = loading_context.loader
metadata = loading_context.metadata
processobj = document_loader.idx
if options.provenance and runtime_context.research_obj:
processobj['id'] = metadata['id']
processobj, metadata = loading_context.loader.resolve_ref(uri)
runtime_context.research_obj.packed_workflow(
cwltool.main.print_pack(document_loader, processobj, uri, metadata))
loading_context.overrides_list.extend(
metadata.get("cwltool:overrides", []))
from cwltool import workflow
from cwltool.resolver import tool_resolver
from cwltool.load_tool import resolve_tool_uri
loadingContext = LoadingContext()
loadingContext.construct_tool_object = getdefault(
loadingContext.construct_tool_object,
workflow.default_make_tool)
loadingContext.resolver = getdefault(loadingContext.resolver,
tool_resolver)
uri, tool_file_uri = resolve_tool_uri(
fname, resolver=loadingContext.resolver,
fetcher_constructor=loadingContext.fetcher_constructor)
document_loader, workflowobj, uri = fetch_document(
uri, resolver=loadingContext.resolver,
fetcher_constructor=loadingContext.fetcher_constructor)
document_loader, avsc_names, processobj, metadata, uri = \
validate_document(
document_loader, workflowobj, uri,
loadingContext.overrides_list, {},
enable_dev=loadingContext.enable_dev,
strict=loadingContext.strict,
preprocess_only=False,
fetcher_constructor=loadingContext.fetcher_constructor,
skip_schemas=False,
do_validate=loadingContext.do_validate)
# Recent versions of cwltool
else:
(loading_context, workflowobj, uri) = fetch_document(fname)
loading_context, uri = resolve_and_validate_document(loading_context,
def load_tool(argsworkflow, # type: Union[Text, Dict[Text, Any]]
loadingContext # type: LoadingContext
): # type: (...) -> Process
document_loader, workflowobj, uri = fetch_document(
argsworkflow,
resolver=loadingContext.resolver,
fetcher_constructor=loadingContext.fetcher_constructor)
document_loader, avsc_names, _, metadata, uri = validate_document(
document_loader, workflowobj, uri,
enable_dev=loadingContext.enable_dev,
strict=loadingContext.strict,
fetcher_constructor=loadingContext.fetcher_constructor,
overrides=loadingContext.overrides_list,
skip_schemas = True,
metadata=loadingContext.metadata)
return make_tool(document_loader,
avsc_names,
metadata,
# updated to the internal CWL version. We need to reload the
# document to go back to its original version.
#
# What's going on here is that the updater replaces the
# documents/fragments in the index with updated ones, the
# index is also used as a cache, so we need to go through the
# loading process with an empty index and updating turned off
# so we have the original un-updated documents.
#
loadingContext = loadingContext.copy()
document_loader = SubLoader(loader or loadingContext.loader or Loader({}))
loadingContext.do_update = False
loadingContext.loader = document_loader
loadingContext.loader.idx = {}
loadingContext.metadata = {}
loadingContext, docobj, uri = fetch_document(uri, loadingContext)
loadingContext, fileuri = resolve_and_validate_document(
loadingContext, docobj, uri, preprocess_only=True
)
if loadingContext.loader is None:
raise Exception("loadingContext.loader cannot be none")
processobj, metadata = loadingContext.loader.resolve_ref(uri)
document_loader = loadingContext.loader
if isinstance(processobj, MutableMapping):
document_loader.idx[processobj["id"]] = CommentedMap(processobj.items())
elif isinstance(processobj, MutableSequence):
_, frag = urllib.parse.urldefrag(uri)
for po in processobj:
if not frag:
if po["id"].endswith("#main"):
uri = po["id"]
job_order_object, input_basedir, jobloader = load_job_order(
args,
stdin,
loadingContext.fetcher_constructor,
loadingContext.overrides_list,
tool_file_uri,
)
if args.overrides:
loadingContext.overrides_list.extend(
load_overrides(
file_uri(os.path.abspath(args.overrides)), tool_file_uri
)
)
loadingContext, workflowobj, uri = fetch_document(uri, loadingContext)
if args.print_deps and loadingContext.loader:
printdeps(
workflowobj, loadingContext.loader, stdout, args.relative_deps, uri
)
return 0
loadingContext, uri = resolve_and_validate_document(
loadingContext,
workflowobj,
uri,
preprocess_only=(args.print_pre or args.pack),
skip_schemas=args.skip_schemas,
)
if loadingContext.loader is None: