Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
""" Does no wrapping. Just returns the command as-is
"""
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
KWargs:
- walltime (int) : This is not used by this launcher.
"""
return command
class SingleNodeLauncher(Launcher):
""" Worker launcher that wraps the user's command with the framework to
launch multiple command invocations in parallel. This wrapper sets the
bash env variable CORES to the number of cores on the machine. By setting
task_blocks to an integer or to a bash expression the number of invocations
of the command to be launched can be controlled.
"""
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
KWargs:
- walltime (int) : This is not used by this launcher.
"""
task_blocks = tasks_per_node * nodes_per_block
cat << SLURM_EOF > cmd_$SLURM_JOB_NAME.sh
{0}
SLURM_EOF
chmod a+x cmd_$SLURM_JOB_NAME.sh
TASKBLOCKS={1}
srun --ntasks $TASKBLOCKS -l bash cmd_$SLURM_JOB_NAME.sh
echo "Done"
'''.format(command, task_blocks)
return x
class SrunMPILauncher(Launcher):
"""Launches as many workers as MPI tasks to be executed concurrently within a block.
Use this launcher instead of SrunLauncher if each block will execute multiple MPI applications
at the same time. Workers should be launched with independent Srun calls so as to setup the
environment for MPI application launch.
"""
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
KWargs:
- walltime (int) : This is not used by this launcher.
"""
task_blocks = tasks_per_node * nodes_per_block
sort -u $PBS_NODEFILE > $HOSTFILE
fi
cat << MPIEXEC_EOF > cmd_$JOBNAME.sh
{0}
MPIEXEC_EOF
chmod u+x cmd_$JOBNAME.sh
mpiexec --bind-to none -n $WORKERCOUNT --hostfile $HOSTFILE /usr/bin/sh cmd_$JOBNAME.sh
echo "All workers done"
'''.format(command, tasks_per_node, nodes_per_block, task_blocks)
return x
class SrunLauncher(Launcher):
""" Worker launcher that wraps the user's command with the SRUN launch framework
to launch multiple cmd invocations in parallel on a single job allocation.
"""
def __init__(self):
pass
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
KWargs:
- walltime (int) : This is not used by this launcher.
"""
{0}
APRUN_EOF
chmod a+x cmd_$JOBNAME.sh
aprun -n {tasks_per_block} -N {tasks_per_node} {overrides} /bin/bash cmd_$JOBNAME.sh &
wait
echo "Done"
'''.format(command, tasks_per_block,
tasks_per_block=tasks_per_block,
tasks_per_node=tasks_per_node,
overrides=self.overrides)
return x
class JsrunLauncher(Launcher):
""" Worker launcher that wraps the user's command with the Jsrun launch framework
to launch multiple cmd invocations in parallel on a single job allocation
"""
def __init__(self, overrides=''):
"""
Parameters
----------
overrides: str
This string will be passed to the JSrun launcher. Default: ''
"""
self.overrides = overrides
def __call__(self, command, tasks_per_node, nodes_per_block):
"""
NODES_PER_BLOCK=$(( $NODES / $TASKBLOCKS ))
for blk in $(seq 1 1 $TASKBLOCKS):
do
srun --exclusive --nodes $NODES_PER_BLOCK -l bash cmd_$SLURM_JOB_NAME.sh &
done
wait
fi
echo "Done"
'''.format(command, task_blocks)
return x
class AprunLauncher(Launcher):
""" Worker launcher that wraps the user's command with the Aprun launch framework
to launch multiple cmd invocations in parallel on a single job allocation
"""
def __init__(self, overrides=''):
self.overrides = overrides
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- tasks_per_node (int) : Workers to launch per node
- nodes_per_block (int) : Number of nodes in a block
KWargs:
- walltime (int) : This is not used by this launcher.
from abc import ABCMeta, abstractmethod
from parsl.utils import RepresentationMixin
class Launcher(RepresentationMixin, metaclass=ABCMeta):
""" Launcher base class to enforce launcher interface
"""
@abstractmethod
def __call__(self, command, tasks_per_node, nodes_per_block):
""" Wraps the command with the Launcher calls.
"""
pass
class SimpleLauncher(Launcher):
""" Does no wrapping. Just returns the command as-is
"""
def __call__(self, command, tasks_per_node, nodes_per_block):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
"""
return command
class SingleNodeLauncher(Launcher):
""" Worker launcher that wraps the user's command with the framework to
launch multiple command invocations in parallel. This wrapper sets the
cat << SLURM_EOF > cmd_$SLURM_JOB_NAME.sh
{0}
SLURM_EOF
chmod a+x cmd_$SLURM_JOB_NAME.sh
TASKBLOCKS={1}
srun --ntasks $TASKBLOCKS -l {overrides} bash cmd_$SLURM_JOB_NAME.sh
echo "Done"
'''.format(command, task_blocks, overrides=self.overrides)
return x
class SrunMPILauncher(Launcher):
"""Launches as many workers as MPI tasks to be executed concurrently within a block.
Use this launcher instead of SrunLauncher if each block will execute multiple MPI applications
at the same time. Workers should be launched with independent Srun calls so as to setup the
environment for MPI application launch.
"""
def __init__(self, overrides=''):
"""
Parameters
----------
overrides: str
This string will be passed to the launcher. Default: ''
"""
self.overrides = overrides
cp /dev/null $PFILE
for COUNT in $(seq 1 1 $WORKERCOUNT)
do
echo "sh cmd_$JOBNAME.sh" >> $PFILE
done
parallel --env _ --joblog "$JOBNAME.sh.parallel.log" \
--sshloginfile $SSHLOGINFILE --jobs {1} < $PFILE
echo "All workers done"
'''.format(command, tasks_per_node, nodes_per_block, task_blocks)
return x
class MpiExecLauncher(Launcher):
""" Worker launcher that wraps the user's command with the framework to
launch multiple command invocations via mpiexec.
This wrapper sets the bash env variable CORES to the number of cores on the
machine.
This launcher makes the following assumptions:
- mpiexec is installed and can be located in $PATH
- The provider makes available the $PBS_NODEFILE environment variable
"""
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
NODES_PER_BLOCK=$(( $NODES / $TASKBLOCKS ))
for blk in $(seq 1 1 $TASKBLOCKS):
do
srun --exclusive --nodes $NODES_PER_BLOCK -l {overrides} bash cmd_$SLURM_JOB_NAME.sh &
done
wait
fi
echo "Done"
'''.format(command, task_blocks, overrides=self.overrides)
return x
class AprunLauncher(Launcher):
""" Worker launcher that wraps the user's command with the Aprun launch framework
to launch multiple cmd invocations in parallel on a single job allocation
"""
def __init__(self, overrides=''):
"""
Parameters
----------
overrides: str
This string will be passed to the aprun launcher. Default: ''
"""
self.overrides = overrides
def __call__(self, command, tasks_per_node, nodes_per_block):
"""
from parsl.utils import RepresentationMixin
class Launcher(RepresentationMixin, metaclass=ABCMeta):
""" Launcher base class to enforce launcher interface
"""
@abstractmethod
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
""" Wraps the command with the Launcher calls.
*MUST* be implemented by the concrete child classes
"""
pass
class SimpleLauncher(Launcher):
""" Does no wrapping. Just returns the command as-is
"""
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
KWargs:
- walltime (int) : This is not used by this launcher.
"""
return command
class SingleNodeLauncher(Launcher):