Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#needed_time = args.tasks_per_trial * args.trials * 2 / target_workers
#if needed_time <= 1800: needed_time = 1800
walltime = time.strftime('%H:%M:%S', time.gmtime(needed_time))
print("The walltime for {} workers is {}".format(target_workers, walltime))
if target_workers % args.cores_per_node != 0:
nodes_per_block = 1
tasks_per_node = target_workers % args.cores_per_node
else:
nodes_per_block = int(target_workers / args.cores_per_node)
tasks_per_node = args.cores_per_node
config = Config(
executors=[
HighThroughputExecutor(
label="funcx_local",
# worker_debug=True,
worker_mode="singularity_reuse",
container_image=os.path.expanduser("~/sing-run.simg"),
cores_per_worker=int(args.cores_per_node / tasks_per_node),
max_workers=1,
address=address_by_interface("eth0"),
provider=CobaltProvider(
launcher=SingleNodeLauncher(),
init_blocks=1,
max_blocks=1,
queue=args.queue,
account='DLHub',
worker_init="source activate funcx_5"
),
)
'%s/lib/python3.6/site-packages' % pyenv_relpath,
x509_proxy,
htex_label)
if condor_cfg is None:
condor_cfg = '''
transfer_output_files = %s
RequestMemory = %d
RequestCpus = %d
''' % (htex_label, mem_per_core * cores_per_job, cores_per_job)
xfer_files = [pyenv_dir, osp.join(grid_proxy_dir, x509_proxy)]
condor_htex = Config(
executors=[
HighThroughputExecutor(
label=htex_label,
address=address_by_hostname(),
prefetch_capacity=0,
cores_per_worker=1,
max_workers=cores_per_job,
worker_logdir_root='./',
provider=CondorProvider(
channel=LocalChannel(),
init_blocks=total_workers,
max_blocks=max_workers,
nodes_per_block=1,
worker_init=wrk_init,
transfer_input_files=xfer_files,
scheduler_options=condor_cfg
),
)
# Here we can add htex_strategy for option
# config
print(type(args.executor) )
if args.executor == 'ThreadPool':
config = Config(
executors=[ThreadPoolExecutor(
#label='threads',
label='htex_local',
max_threads=5)
],
)
elif args.executor == 'HighThroughput':
config = Config(
executors=[
HighThroughputExecutor(
label="htex_local",
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
# tasks_per_node=1, # For HighThroughputExecutor, this option sho<
launcher=SingleNodeLauncher(),
),
)
],
#strategy='htex_aggressive',
#strategy='htex_totaltime',
strategy='simple',
)
# TODO:
from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from ..executor import _futures_handler
from .timeout import timeout
try:
from collections.abc import Sequence
except ImportError:
from collections import Sequence
_default_cfg = Config(
executors=[
HighThroughputExecutor(
label="coffea_parsl_default",
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
),
)
],
strategy=None,
)
def _parsl_initialize(config=None):
parsl.clear()
parsl.load(config)
# since we can't know slots ahead of time.
tasks_per_node = 1
elif isinstance(executor, ExtremeScaleExecutor):
tasks_per_node = executor.ranks_per_node
nodes_per_block = executor.provider.nodes_per_block
parallelism = executor.provider.parallelism
running = sum([1 for x in status if x == 'RUNNING'])
submitting = sum([1 for x in status if x == 'SUBMITTING'])
pending = sum([1 for x in status if x == 'PENDING'])
active_blocks = running + submitting + pending
active_slots = active_blocks * tasks_per_node * nodes_per_block
if (isinstance(executor, IPyParallelExecutor) or
isinstance(executor, HighThroughputExecutor) or
isinstance(executor, ExtremeScaleExecutor)):
logger.debug('Executor {} has {} active tasks, {}/{}/{} running/submitted/pending blocks, and {} connected engines'.format(
label, active_tasks, running, submitting, pending, len(executor.connected_workers)))
else:
logger.debug('Executor {} has {} active tasks and {}/{}/{} running/submitted/pending blocks'.format(
label, active_tasks, running, submitting, pending))
# reset kill timer if executor has active tasks
if active_tasks > 0 and self.executors[executor.label]['idle_since']:
self.executors[executor.label]['idle_since'] = None
# Case 1
# No tasks.
if active_tasks == 0:
# Case 1a
# Fewer blocks that min_blocks
# Here we can add htex_strategy for option
# config
if args.executor == 'ThreadPool':
config = Config(
executors=[ThreadPoolExecutor(
#label='threads',
label='htex_local',
max_threads=5)
],
)
elif args.executor == 'HighThroughput_Local':
config = Config(
executors=[
HighThroughputExecutor(
label="htex_local",
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
# tasks_perss_node=1, # For HighThroughputExecutor, this option sho<
launcher=SingleNodeLauncher(),
),
)
],
#strategy='htex_aggressive',
#strategy='htex_totaltime',
#strategy='simple',
strategy=args.strategy,
)
"address": address_by_hostname(),
**settings.parsl.executor.dict(skip_defaults=True),
}
else:
parsl_executor_construct = {
"label": "QCFractal_Parsl_{}_Executor".format(settings.cluster.scheduler.title()),
"cores_per_worker": cores_per_task,
"max_workers": settings.common.tasks_per_worker,
"provider": provider,
"address": address_by_hostname(),
**settings.parsl.executor.dict(skip_defaults=True),
}
queue_client = Config(
retries=settings.common.retries, executors=[HighThroughputExecutor(**parsl_executor_construct)]
)
else:
raise KeyError(
"Unknown adapter type '{}', available options: {}.\n"
"This code should also be unreachable with pydantic Validation, so if "
"you see this message, please report it to the QCFractal GitHub".format(
settings.common.adapter, [getattr(AdapterEnum, v).value for v in AdapterEnum]
)
)
# Build out the manager itself
# Compute max tasks
max_concurrent_tasks = settings.common.tasks_per_worker * settings.common.max_workers
if settings.manager.max_queued_tasks is None:
# Tasks * jobs * buffer + 1
if not executor.scaling_enabled:
continue
# Tasks that are either pending completion
active_tasks = executor.outstanding
status = executor.status()
self.unset_logging()
# FIXME we need to handle case where provider does not define these
# FIXME probably more of this logic should be moved to the provider
min_blocks = executor.provider.min_blocks
max_blocks = executor.provider.max_blocks
if isinstance(executor, IPyParallelExecutor):
tasks_per_node = executor.workers_per_node
elif isinstance(executor, HighThroughputExecutor):
# This is probably wrong calculation, we need this to come from the executor
# since we can't know slots ahead of time.
tasks_per_node = 1
elif isinstance(executor, ExtremeScaleExecutor):
tasks_per_node = executor.ranks_per_node
nodes_per_block = executor.provider.nodes_per_block
parallelism = executor.provider.parallelism
running = sum([1 for x in status if x == 'RUNNING'])
submitting = sum([1 for x in status if x == 'SUBMITTING'])
pending = sum([1 for x in status if x == 'PENDING'])
active_blocks = running + submitting + pending
active_slots = active_blocks * tasks_per_node * nodes_per_block
if (isinstance(executor, IPyParallelExecutor) or
logger.setLevel(logging.INFO)
@python_app
def _logger(*args):
import logging
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
_logging = logger.setLevel(logging.INFO)
return _logging
# config
config = Config(
executors=[
HighThroughputExecutor(
label="local_threads",
#label="htex_local",
# worker_debug=True,
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=4,
# tasks_per_node=1, # For HighThroughputExecutor, this option sho<
launcher=SingleNodeLauncher(),
),
)
],
#strategy='htex_aggressive',
strategy='htex_totaltime',
#strategy='simple',
# Here we can add htex_strategy for option
# config
if args.executor == 'ThreadPool':
config = Config(
executors=[ThreadPoolExecutor(
#label='threads',
label='htex_local',
max_threads=5)
],
)
elif args.executor == 'HighThroughput_Local':
config = Config(
executors=[
HighThroughputExecutor(
label="htex_local",
cores_per_worker=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
# tasks_perss_node=1, # For HighThroughputExecutor, this option sho<
launcher=SingleNodeLauncher(),
),
)
],
#strategy='htex_aggressive',
#strategy='htex_totaltime',
#strategy='simple',
strategy=args.strategy,
)