Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
cp /dev/null $PFILE
for COUNT in $(seq 1 1 $WORKERCOUNT)
do
echo "sh cmd_$JOBNAME.sh" >> $PFILE
done
parallel --env _ --joblog "$JOBNAME.sh.parallel.log" \
--sshloginfile $SSHLOGINFILE --jobs {1} < $PFILE
echo "All workers done"
'''.format(command, tasks_per_node, nodes_per_block, task_blocks)
return x
class MpiExecLauncher(Launcher):
""" Worker launcher that wraps the user's command with the framework to
launch multiple command invocations via mpiexec.
This wrapper sets the bash env variable CORES to the number of cores on the
machine.
This launcher makes the following assumptions:
- mpiexec is installed and can be located in $PATH
- The provider makes available the $PBS_NODEFILE environment variable
"""
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
CMD ( ) {{
{0}
}}
for COUNT in $(seq 1 1 $WORKERCOUNT)
do
echo "Launching worker: $COUNT"
CMD &
done
wait
echo "All workers done"
'''.format(command, task_blocks)
return x
class GnuParallelLauncher(Launcher):
""" Worker launcher that wraps the user's command with the framework to
launch multiple command invocations via GNU parallel sshlogin.
This wrapper sets the bash env variable CORES to the number of cores on the
machine.
This launcher makes the following assumptions:
- GNU parallel is installed and can be located in $PATH
- Paswordless SSH login is configured between the controller node and the
target nodes.
- The provider makes available the $PBS_NODEFILE environment variable
"""
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
sort -u $PBS_NODEFILE > $HOSTFILE
fi
cat << MPIEXEC_EOF > cmd_$JOBNAME.sh
{0}
MPIEXEC_EOF
chmod u+x cmd_$JOBNAME.sh
mpiexec --bind-to none -n $WORKERCOUNT --hostfile $HOSTFILE /usr/bin/sh cmd_$JOBNAME.sh
echo "All workers done"
'''.format(command, tasks_per_node, nodes_per_block, task_blocks)
return x
class SrunLauncher(Launcher):
""" Worker launcher that wraps the user's command with the SRUN launch framework
to launch multiple cmd invocations in parallel on a single job allocation.
"""
def __init__(self, overrides=''):
"""
Parameters
----------
overrides: str
This string will be passed to the srun launcher. Default: ''
"""
self.overrides = overrides
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
NODES_PER_BLOCK=$(( $NODES / $TASKBLOCKS ))
for blk in $(seq 1 1 $TASKBLOCKS):
do
srun --exclusive --nodes $NODES_PER_BLOCK -l {overrides} bash cmd_$SLURM_JOB_NAME.sh &
done
wait
fi
echo "Done"
'''.format(command, task_blocks, overrides=self.overrides)
return x
class AprunLauncher(Launcher):
""" Worker launcher that wraps the user's command with the Aprun launch framework
to launch multiple cmd invocations in parallel on a single job allocation
"""
def __init__(self, overrides=''):
"""
Parameters
----------
overrides: str
This string will be passed to the aprun launcher. Default: ''
"""
self.overrides = overrides
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
from parsl.utils import RepresentationMixin
class Launcher(RepresentationMixin, metaclass=ABCMeta):
""" Launcher base class to enforce launcher interface
"""
@abstractmethod
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
""" Wraps the command with the Launcher calls.
*MUST* be implemented by the concrete child classes
"""
pass
class SimpleLauncher(Launcher):
""" Does no wrapping. Just returns the command as-is
"""
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
KWargs:
- walltime (int) : This is not used by this launcher.
"""
return command
class SingleNodeLauncher(Launcher):
""" Does no wrapping. Just returns the command as-is
"""
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
KWargs:
- walltime (int) : This is not used by this launcher.
"""
return command
class SingleNodeLauncher(Launcher):
""" Worker launcher that wraps the user's command with the framework to
launch multiple command invocations in parallel. This wrapper sets the
bash env variable CORES to the number of cores on the machine. By setting
task_blocks to an integer or to a bash expression the number of invocations
of the command to be launched can be controlled.
"""
def __call__(self, command, tasks_per_node, nodes_per_block, walltime=None):
"""
Args:
- command (string): The command string to be launched
- task_block (string) : bash evaluated string.
KWargs:
- walltime (int) : This is not used by this launcher.
"""
task_blocks = tasks_per_node * nodes_per_block
cat << SLURM_EOF > cmd_$SLURM_JOB_NAME.sh
{0}
SLURM_EOF
chmod a+x cmd_$SLURM_JOB_NAME.sh
TASKBLOCKS={1}
srun --ntasks $TASKBLOCKS -l {overrides} bash cmd_$SLURM_JOB_NAME.sh
echo "Done"
'''.format(command, task_blocks, overrides=self.overrides)
return x
class SrunMPILauncher(Launcher):
"""Launches as many workers as MPI tasks to be executed concurrently within a block.
Use this launcher instead of SrunLauncher if each block will execute multiple MPI applications
at the same time. Workers should be launched with independent Srun calls so as to setup the
environment for MPI application launch.
"""
def __init__(self, overrides=''):
"""
Parameters
----------
overrides: str
This string will be passed to the launcher. Default: ''
"""
self.overrides = overrides