Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_argparse(name, script_contents, params, tmpdir):
script = None
try:
script = NamedTemporaryFile(mode="w", delete=False)
script.write(script_contents)
script.close()
my_params = ["--outdir", str(tmpdir)]
my_params.extend(params(script.name))
assert main(my_params) == 0, name
except SystemExit as err:
assert err.code == 0, name
finally:
if script and script.name and os.path.exists(script.name):
os.unlink(script.name)
def test_require_prefix_networkaccess() -> None:
assert main(["--enable-ext", get_data("tests/wf/networkaccess.cwl")]) == 0
assert main([get_data("tests/wf/networkaccess.cwl")]) != 0
assert main(["--enable-ext", get_data("tests/wf/networkaccess-fail.cwl")]) != 0
"file://{cwl_posix_path}#reverse_sort" -> "file://{cwl_posix_path}#sorted";
}}
""".format(cwl_posix_path=cwl_posix_path))[0]
stdout = StringIO()
assert main(["--print-dot", cwl_path], stdout=stdout) == 0
computed_dot = pydot.graph_from_dot_data(stdout.getvalue())[0]
computed_edges = sorted(
[(urlparse(source).fragment, urlparse(target).fragment) for source, target in computed_dot.obj_dict['edges']])
expected_edges = sorted(
[(urlparse(source).fragment, urlparse(target).fragment) for source, target in expected_dot.obj_dict['edges']])
assert computed_edges == expected_edges
# print CommandLineTool
cwl_path = get_data("tests/wf/echo.cwl")
stdout = StringIO()
assert main(["--debug", "--print-dot", cwl_path], stdout=stdout) == 1
def test_input_deps() -> None:
stream = StringIO()
main(
[
"--print-input-deps",
get_data("tests/wf/count-lines1-wf.cwl"),
get_data("tests/wf/wc-job.json"),
],
stdout=stream,
)
expected = {
"class": "File",
"location": "wc-job.json",
"format": CWL_IANA,
"secondaryFiles": [
{
"class": "File",
"location": "whale.txt",
def test_directory_dest2():
with temp_dir("out") as out_dir:
assert main(["--enable-ext", "--outdir="+out_dir, get_data('tests/destination/directory-dest2.cwl')]) == 0
assert os.path.isfile(os.path.join(out_dir, "bar/baz/foo"))
def test_target_packed() -> None:
"""Test --target option with packed workflow schema."""
test_file = "tests/wf/scatter-wf4.json"
exit_code = main(
["--target", "out", get_data(test_file), "--inp1", "INP1", "--inp2", "INP2"]
)
assert exit_code == 0
def test_dont_require_inputs():
if sys.version_info[0] < 3:
stream = BytesIO()
else:
stream = StringIO()
script = None
try:
script = NamedTemporaryFile(mode="w", delete=False)
script.write(script_a)
script.close()
assert (
main(
argsl=["--debug", script.name, "--input", script.name],
executor=NoopJobExecutor(),
stdout=stream,
)
== 0
)
assert (
main(
argsl=["--debug", script.name],
executor=NoopJobExecutor(),
stdout=stream,
)
== 2
)
assert (
main(
def test_overrides_fails(parameters: List[str], expected_error: str) -> None:
sio = StringIO()
assert main(parameters, stderr=sio) == 1
stderr = sio.getvalue()
assert expected_error in stderr, stderr
remote_storage_url=parsed_args.remote_storage_url,
token=parsed_args.token)
runtime_context = cwltool.main.RuntimeContext(vars(parsed_args))
runtime_context.make_fs_access = functools.partial(
CachingFtpFsAccess, insecure=parsed_args.insecure)
runtime_context.path_mapper = functools.partial(
TESPathMapper, fs_access=ftp_fs_access)
job_executor = MultithreadedJobExecutor() if parsed_args.parallel \
else SingleJobExecutor()
job_executor.max_ram = job_executor.max_cores = float("inf")
executor = functools.partial(
tes_execute, job_executor=job_executor,
loading_context=loading_context,
remote_storage_url=parsed_args.remote_storage_url,
ftp_access=ftp_fs_access)
return cwltool.main.main(
args=parsed_args,
executor=executor,
loadingContext=loading_context,
runtimeContext=runtime_context,
versionfunc=versionstring,
logger_handler=console
)
)
tmpdir_fmt = "${AWS_BATCH_CE_NAME:-$AWS_EXECUTION_ENV}.${AWS_BATCH_JQ_NAME:-}.${AWS_BATCH_JOB_ID:-}.XXXXX"
shellcode += ['BATCH_SCRIPT=$(mktemp --tmpdir "{tmpdir_fmt}")'.format(tmpdir_fmt=tmpdir_fmt),
"apt-get update -qq",
"apt-get install -qqy --no-install-suggests --no-install-recommends curl ca-certificates gnupg",
"curl -L '{payload_url}' > $BATCH_SCRIPT".format(payload_url=payload_url),
"chmod +x $BATCH_SCRIPT",
"$BATCH_SCRIPT"]
elif args.cwl:
ensure_dynamodb_table("aegea-batch-jobs", hash_key_name="job_id")
bucket = ensure_s3_bucket(args.staging_s3_bucket or "aegea-batch-jobs-" + ARN.get_account_id())
args.environment.append(dict(name="AEGEA_BATCH_S3_BASE_URL", value="s3://" + bucket.name))
from cwltool.main import main as cwltool_main
with io.BytesIO() as preprocessed_cwl:
if cwltool_main(["--print-pre", args.cwl], stdout=preprocessed_cwl) != 0:
raise AegeaException("Error while running cwltool")
cwl_spec = yaml.load(preprocessed_cwl.getvalue())
payload = base64.b64encode(preprocessed_cwl.getvalue()).decode()
args.environment.append(dict(name="AEGEA_BATCH_CWL_DEF_B64", value=payload))
payload = base64.b64encode(args.cwl_input.read()).decode()
args.environment.append(dict(name="AEGEA_BATCH_CWL_JOB_B64", value=payload))
for requirement in cwl_spec.get("requirements", []):
if requirement["class"] == "DockerRequirement":
# FIXME: dockerFile support: ensure_ecr_image(...)
# container_props["image"] = requirement["dockerPull"]
pass
shellcode += [
# 'sed -i -e "s|http://archive.ubuntu.com|http://us-east-1.ec2.archive.ubuntu.com|g" /etc/apt/sources.list',
# "apt-get update -qq",