Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'''
job_name = "{0}.{1}".format(job_name, time.time())
# Set script path
script_path = "{0}/{1}.sh".format(self.script_dir, job_name)
script_path = os.path.abspath(script_path)
wrap_command = self.worker_init + '\n' + self.launcher(command, tasks_per_node, self.nodes_per_block)
self._write_submit_script(wrap_command, script_path)
job_id = None
proc = None
remote_pid = None
if (self.move_files is None and not isinstance(self.channel, LocalChannel)) or (self.move_files):
logger.debug("Pushing start script")
script_path = self.channel.push_file(script_path, self.channel.script_dir)
if not isinstance(self.channel, LocalChannel):
logger.debug("Launching in remote mode")
# Bash would return until the streams are closed. So we redirect to a outs file
cmd = 'bash {0} > {0}.out 2>&1 & \n echo "PID:$!" '.format(script_path)
retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout)
for line in stdout.split('\n'):
if line.startswith("PID:"):
remote_pid = line.split("PID:")[1].strip()
job_id = remote_pid
if job_id is None:
logger.warning("Channel failed to start remote command/retrieve PID")
else:
# Set script path
script_path = "{0}/{1}.sh".format(self.script_dir, job_name)
script_path = os.path.abspath(script_path)
wrap_command = self.worker_init + '\n' + self.launcher(command, tasks_per_node, self.nodes_per_block)
self._write_submit_script(wrap_command, script_path)
job_id = None
proc = None
remote_pid = None
if (self.move_files is None and not isinstance(self.channel, LocalChannel)) or (self.move_files):
logger.debug("Moving start script")
script_path = self.channel.push_file(script_path, self.channel.script_dir)
if not isinstance(self.channel, LocalChannel):
logger.debug("Launching in remote mode")
# Bash would return until the streams are closed. So we redirect to a outs file
cmd = 'bash {0} &> {0}.out & \n echo "PID:$!" '.format(script_path)
retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout)
for line in stdout.split('\n'):
if line.startswith("PID:"):
remote_pid = line.split("PID:")[1].strip()
job_id = remote_pid
if job_id is None:
logger.warning("Channel failed to start remote command/retrieve PID")
else:
try:
job_id, proc = self.channel.execute_no_wait('bash {0}'.format(script_path), self.cmd_timeout)
except Exception as e:
logger.debug("Channel execute failed for: {}, {}".format(self.channel, e))
import logging
from parsl.config import Config
from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.launchers import SimpleLauncher
from parsl.executors import ExtremeScaleExecutor
ncore=2
config = Config(
executors=[
ExtremeScaleExecutor(
label="Extreme_Local",
worker_debug=True,
ranks_per_node=ncore,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
launcher=SimpleLauncher()
)
)
],
strategy=None,
)
parsl.load(config)
mol=gto.M(atom='H 0. 0. 0.; H 0. 0. 2.0',unit='bohr',
ecp='bfd', basis='bfd_vtz')
mf = scf.RHF(mol).run()
mol.output=None
mol.stdout=None
def __init__(self,
channel=LocalChannel(),
nodes_per_block=1,
launcher=SingleNodeLauncher(),
init_blocks=4,
min_blocks=0,
max_blocks=10,
walltime="00:15:00",
worker_init='',
cmd_timeout=30,
parallelism=1,
move_files=None):
self.channel = channel
self._label = 'local'
self.provisioned_blocks = 0
self.nodes_per_block = nodes_per_block
self.launcher = launcher
self.worker_init = worker_init
def __init__(self,
channel=LocalChannel(),
nodes_per_block=1,
init_blocks=0,
min_blocks=0,
max_blocks=10,
parallelism=1,
walltime="00:10:00",
account=None,
queue=None,
scheduler_options='',
worker_init='',
launcher=AprunLauncher(),
cmd_timeout=10):
label = 'cobalt'
super().__init__(label,
channel=channel,
nodes_per_block=nodes_per_block,
if args.executor == 'ThreadPool':
config = Config(
executors=[ThreadPoolExecutor(
#label='threads',
label='htex_local',
max_threads=5)
],
)
elif args.executor == 'HighThroughput':
config = Config(
executors=[
HighThroughputExecutor(
label="htex_local",
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
# tasks_per_node=1, # For HighThroughputExecutor, this option sho<
launcher=SingleNodeLauncher(),
),
)
],
#strategy='htex_aggressive',
#strategy='htex_totaltime',
strategy='simple',
)
# TODO:
#try:
#except:
# raise NameError("Invalid parsed argument")
def __init__(self,
channel=LocalChannel(),
nodes_per_block=1,
init_blocks=0,
min_blocks=0,
max_blocks=10,
parallelism=1,
walltime="00:10:00",
account=None,
queue=None,
scheduler_options='',
worker_init='',
launcher=AprunLauncher(),
cmd_timeout=10):
label = 'cobalt'
super().__init__(label,
channel=channel,
nodes_per_block=nodes_per_block,
if args.executor == 'ThreadPool':
config = Config(
executors=[ThreadPoolExecutor(
#label='threads',
label='htex_local',
max_threads=5)
],
)
elif args.executor == 'HighThroughput':
config = Config(
executors=[
HighThroughputExecutor(
label="htex_local",
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
# tasks_per_node=1, # For HighThroughputExecutor, this option sho<
launcher=SingleNodeLauncher(),
),
)
],
#strategy='htex_aggressive',
strategy='htex_totaltime',
#strategy='simple',
)
# TODO:
#try:
#except:
# raise NameError("Invalid parsed argument")
# worker_ports=(50078, 50079),
# Set a port range from which ports should be picked.
worker_port_range=(40010, 40020),
# The fabric_threaded.py script launches the MPI version
# launch_cmd="./cleanup.sh ; mpiexec -np 4 python3 fabric_threaded.py {debug} --task_url={task_url} --result_url={result_url}",
# launch_cmd="./cleanup.sh ; fabric_single_node.py {debug} --task_url={task_url} --result_url={result_url} ",
# launch_cmd="./cleanup.sh ",
# launch_cmd="sleep 600",
# Enable engine debug logging
engine_debug=True,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
tasks_per_node=1,
)
)
],
strategy=None,
)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
_logging = logger.setLevel(logging.INFO)
return _logging
# config
config = Config(
executors=[
HighThroughputExecutor(
label="local_threads",
#label="htex_local",
# worker_debug=True,
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=4,
# tasks_per_node=1, # For HighThroughputExecutor, this option sho<
launcher=SingleNodeLauncher(),
),
)
],
#strategy='htex_aggressive',
strategy='htex_totaltime',
#strategy='simple',
)
# Load config
parsl.load(config)
@python_app