Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
LOG.warning(
"The env:variables section does not have an OUTPUT_PATH"
"specified, multi-machine checks cannot be performed."
)
worker_args = get_yaml_var(worker_val, "args", celery_args)
with suppress(KeyError):
if worker_val["args"] is None:
worker_args = ""
worker_nodes = get_yaml_var(worker_val, "nodes", None)
worker_batch = get_yaml_var(worker_val, "batch", None)
wsteps = get_yaml_var(worker_val, "steps", steps)
queues = spec.make_queue_string(wsteps).split(",")
# Check for missing arguments
parallel = batch_check_parallel(spec)
if parallel:
if "--concurrency" not in worker_args:
LOG.warning(
"The worker arg --concurrency [1-4] is recommended "
"when running parallel tasks"
)
if "--prefetch-multiplier" not in worker_args:
LOG.warning(
"The worker arg --prefetch-multiplier 1 is "
"recommended when running parallel tasks"
)
if "fair" not in worker_args:
f"The output path, {output_path}, is not accessible on this host, {hostname}"
)
else:
LOG.warning(
"The env:variables section does not have an OUTPUT_PATH"
"specified, multi-machine checks cannot be performed."
)
worker_args = get_yaml_var(worker_val, "args", celery_args)
with suppress(KeyError):
if worker_val["args"] is None:
worker_args = ""
worker_nodes = get_yaml_var(worker_val, "nodes", None)
worker_batch = get_yaml_var(worker_val, "batch", None)
wsteps = get_yaml_var(worker_val, "steps", steps)
queues = spec.make_queue_string(wsteps).split(",")
# Check for missing arguments
parallel = batch_check_parallel(spec)
if parallel:
if "--concurrency" not in worker_args:
LOG.warning(
"The worker arg --concurrency [1-4] is recommended "
"when running parallel tasks"
)
if "--prefetch-multiplier" not in worker_args:
LOG.warning(
"The worker arg --prefetch-multiplier 1 is "
"recommended when running parallel tasks"
if launcher == "lsf":
# The jsrun utility does not have a time argument
launchs = f"jsrun -a 1 -c ALL_CPUS -g ALL_GPUS --bind=none -n {nodes}"
launchs += f" {launch_args}"
# Allow for any pre launch manipulation, e.g. module load
# hwloc/1.11.10-cuda
if launch_pre:
launchs = f"{launch_pre} {launchs}"
worker_cmd = f"{launchs} {com}"
if btype == "flux":
flux_path = get_yaml_var(batch, "flux_path", "")
flux_opts = get_yaml_var(batch, "flux_start_opts", "")
flux_exec_workers = get_yaml_var(batch, "flux_exec_workers", True)
flux_exec = ""
if flux_exec_workers:
flux_exec = "flux exec"
if "/" in flux_path:
flux_path += "/"
flux_exe = os.path.join(flux_path, "flux")
launch = (
f"{launchs} {flux_exe} start {flux_opts} {flux_exec} `which {shell}` -c"
)
worker_cmd = f'{launch} "{com}"'
return com
if nodes is None:
# Use the value in the batch section
nodes = get_yaml_var(batch, "nodes", None)
# Get the number of nodes from the environment if unset
if nodes is None or nodes == "all":
nodes = get_node_count(default=1)
bank = get_yaml_var(batch, "bank", "")
queue = get_yaml_var(batch, "queue", "")
shell = get_yaml_var(batch, "shell", "bash")
walltime = get_yaml_var(batch, "walltime", "")
launch_pre = get_yaml_var(batch, "launch_pre", "")
launch_args = get_yaml_var(batch, "launch_args", "")
worker_launch = get_yaml_var(batch, "worker_launch", "")
if btype == "flux":
launcher = get_batch_type()
else:
launcher = get_batch_type()
launchs = worker_launch
if not launchs:
if btype == "slurm" or launcher == "slurm":
launchs = f"srun --mpi=none -N {nodes} -n {nodes}"
if bank:
launchs += f" -A {bank}"
if queue:
launchs += f" -p {queue}"
senv = spec.environment
spenv = os.environ.copy()
yenv = None
if senv:
yenv = get_yaml_var(senv, "variables", {})
for k, v in yenv.items():
spenv[str(k)] = str(v)
# For expandvars
os.environ[str(k)] = str(v)
worker_list = []
local_queues = []
for worker_name, worker_val in workers.items():
worker_machines = get_yaml_var(worker_val, "machines", None)
if worker_machines:
LOG.debug("check machines = ", check_machines(worker_machines))
if not check_machines(worker_machines):
continue
if yenv:
output_path = get_yaml_var(yenv, "OUTPUT_PATH", None)
if output_path and not os.path.exists(output_path):
hostname = socket.gethostname()
LOG.error(
f"The output path, {output_path}, is not accessible on this host, {hostname}"
)
else:
LOG.warning(
"The env:variables section does not have an OUTPUT_PATH"
"specified, multi-machine checks cannot be performed."
spenv[str(k)] = str(v)
# For expandvars
os.environ[str(k)] = str(v)
worker_list = []
local_queues = []
for worker_name, worker_val in workers.items():
worker_machines = get_yaml_var(worker_val, "machines", None)
if worker_machines:
LOG.debug("check machines = ", check_machines(worker_machines))
if not check_machines(worker_machines):
continue
if yenv:
output_path = get_yaml_var(yenv, "OUTPUT_PATH", None)
if output_path and not os.path.exists(output_path):
hostname = socket.gethostname()
LOG.error(
f"The output path, {output_path}, is not accessible on this host, {hostname}"
)
else:
LOG.warning(
"The env:variables section does not have an OUTPUT_PATH"
"specified, multi-machine checks cannot be performed."
)
worker_args = get_yaml_var(worker_val, "args", celery_args)
with suppress(KeyError):
if worker_val["args"] is None:
worker_args = ""
launchs += f" -t {walltime}"
if launcher == "lsf":
# The jsrun utility does not have a time argument
launchs = f"jsrun -a 1 -c ALL_CPUS -g ALL_GPUS --bind=none -n {nodes}"
launchs += f" {launch_args}"
# Allow for any pre launch manipulation, e.g. module load
# hwloc/1.11.10-cuda
if launch_pre:
launchs = f"{launch_pre} {launchs}"
worker_cmd = f"{launchs} {com}"
if btype == "flux":
flux_path = get_yaml_var(batch, "flux_path", "")
flux_opts = get_yaml_var(batch, "flux_start_opts", "")
flux_exec_workers = get_yaml_var(batch, "flux_exec_workers", True)
flux_exec = ""
if flux_exec_workers:
flux_exec = "flux exec"
if "/" in flux_path:
flux_path += "/"
flux_exe = os.path.join(flux_path, "flux")
launch = (
f"{launchs} {flux_exe} start {flux_opts} {flux_exec} `which {shell}` -c"
)
worker_cmd = f'{launch} "{com}"'
if nodes is None:
# Use the value in the batch section
nodes = get_yaml_var(batch, "nodes", None)
# Get the number of nodes from the environment if unset
if nodes is None or nodes == "all":
nodes = get_node_count(default=1)
bank = get_yaml_var(batch, "bank", "")
queue = get_yaml_var(batch, "queue", "")
shell = get_yaml_var(batch, "shell", "bash")
walltime = get_yaml_var(batch, "walltime", "")
launch_pre = get_yaml_var(batch, "launch_pre", "")
launch_args = get_yaml_var(batch, "launch_args", "")
worker_launch = get_yaml_var(batch, "worker_launch", "")
if btype == "flux":
launcher = get_batch_type()
else:
launcher = get_batch_type()
launchs = worker_launch
if not launchs:
if btype == "slurm" or launcher == "slurm":
launchs = f"srun --mpi=none -N {nodes} -n {nodes}"
if bank:
launchs += f" -A {bank}"
if queue:
launchs += f" -p {queue}"
if walltime:
# all non flux lsf submissions need to be local.
if btype == "local" or "lsf" in btype:
return com
if nodes is None:
# Use the value in the batch section
nodes = get_yaml_var(batch, "nodes", None)
# Get the number of nodes from the environment if unset
if nodes is None or nodes == "all":
nodes = get_node_count(default=1)
bank = get_yaml_var(batch, "bank", "")
queue = get_yaml_var(batch, "queue", "")
shell = get_yaml_var(batch, "shell", "bash")
walltime = get_yaml_var(batch, "walltime", "")
launch_pre = get_yaml_var(batch, "launch_pre", "")
launch_args = get_yaml_var(batch, "launch_args", "")
worker_launch = get_yaml_var(batch, "worker_launch", "")
if btype == "flux":
launcher = get_batch_type()
else:
launcher = get_batch_type()
launchs = worker_launch
if not launchs:
if btype == "slurm" or launcher == "slurm":
launchs = f"srun --mpi=none -N {nodes} -n {nodes}"
if bank:
launchs += f" -A {bank}"
# A jsrun submission cannot be run under a parent jsrun so
# all non flux lsf submissions need to be local.
if btype == "local" or "lsf" in btype:
return com
if nodes is None:
# Use the value in the batch section
nodes = get_yaml_var(batch, "nodes", None)
# Get the number of nodes from the environment if unset
if nodes is None or nodes == "all":
nodes = get_node_count(default=1)
bank = get_yaml_var(batch, "bank", "")
queue = get_yaml_var(batch, "queue", "")
shell = get_yaml_var(batch, "shell", "bash")
walltime = get_yaml_var(batch, "walltime", "")
launch_pre = get_yaml_var(batch, "launch_pre", "")
launch_args = get_yaml_var(batch, "launch_args", "")
worker_launch = get_yaml_var(batch, "worker_launch", "")
if btype == "flux":
launcher = get_batch_type()
else:
launcher = get_batch_type()
launchs = worker_launch
if not launchs:
if btype == "slurm" or launcher == "slurm":
launchs = f"srun --mpi=none -N {nodes} -n {nodes}"