Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'host': gethostname(),
}
if self.monitoring:
self.monitoring.send(MessageType.WORKFLOW_INFO,
workflow_info)
checkpoints = self.load_checkpoints(config.checkpoint_files)
self.memoizer = Memoizer(self, memoize=config.app_cache, checkpoint=checkpoints)
self.checkpointed_tasks = 0
self._checkpoint_timer = None
self.checkpoint_mode = config.checkpoint_mode
self.data_manager = DataManager(self)
self.executors = {}
data_manager_executor = ThreadPoolExecutor(max_threads=config.data_management_max_threads, label='data_manager')
self.add_executors(config.executors + [data_manager_executor])
if self.checkpoint_mode == "periodic":
try:
h, m, s = map(int, config.checkpoint_period.split(':'))
checkpoint_period = (h * 3600) + (m * 60) + s
self._checkpoint_timer = Timer(self.checkpoint, interval=checkpoint_period, name="Checkpoint")
except Exception:
logger.error("invalid checkpoint_period provided: {0} expected HH:MM:SS".format(config.checkpoint_period))
self._checkpoint_timer = Timer(self.checkpoint, interval=(30 * 60), name="Checkpoint")
# if we use the functionality of dynamically adding executors
# all executors should be managed.
if any([x.managed for x in config.executors]):
self.flowcontrol = FlowControl(self)
else:
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor
config = Config(
executors=[
ThreadPoolExecutor(
label='local_threads_checkpoint_dfk_exit',
)
],
checkpoint_mode='dfk_exit',
)
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor
config = Config(
executors=[
ThreadPoolExecutor(
label='local_threads_checkpoint_periodic',
)
],
checkpoint_mode='periodic',
checkpoint_period='00:00:05',
)
def __init__(self):
"""Constructor for the execution provider factory.
Args:
None
"""
self.executors = {'ipp': IPyParallelExecutor,
'swift_t': TurbineExecutor,
'threads': ThreadPoolExecutor,
None: lambda *args, **kwargs: None}
self.execution_providers = {'local': Local,
# Cloud Systems
'aws': EC2Provider,
'googleCloud': GoogleCloud,
'azure': AzureProvider,
# Cluster/HPC systems
'slurm': Slurm,
'cobalt': Cobalt,
'condor': Condor,
'torque': Torque,
'gridEngine': GridEngine,
None: lambda *args, **kwargs: None}
self.channels = {'ssh': SshChannel,
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor
config = Config(
executors=[
ThreadPoolExecutor(
label='local_threads_checkpoint_task_exit',
)
],
checkpoint_mode='task_exit',
)
def __init__(self,
executors=None,
app_cache=True,
checkpoint_files=None,
checkpoint_mode=None,
checkpoint_period=None,
data_management_max_threads=10,
lazy_errors=True,
retries=0,
run_dir='runinfo',
strategy='simple',
monitoring=None,
usage_tracking=False):
if executors is None:
executors = [ThreadPoolExecutor()]
self.executors = executors
self.app_cache = app_cache
self.checkpoint_files = checkpoint_files
self.checkpoint_mode = checkpoint_mode
if checkpoint_period is not None:
if checkpoint_mode is None:
logger.debug('The requested `checkpoint_period={}` will have no effect because `checkpoint_mode=None`'.format(
checkpoint_period)
)
elif checkpoint_mode != 'periodic':
logger.debug("Requested checkpoint period of {} only has an effect with checkpoint_mode='periodic'".format(
checkpoint_period)
)
if checkpoint_mode == 'periodic' and checkpoint_period is None:
checkpoint_period = "00:30:00"
self.checkpoint_period = checkpoint_period
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor
config = Config(
executors=[
ThreadPoolExecutor(max_threads=4),
],
app_cache=False,
)
from parsl.config import Config
from parsl.data_provider.scheme import GlobusScheme
from parsl.executors.threads import ThreadPoolExecutor
# This is an example config, make sure to
# replace the specific values below with the literal values
# (e.g., 'USERNAME' -> 'your_username')
config = Config(
executors=[
ThreadPoolExecutor(
label='local_threads_globus',
storage_access=[GlobusScheme(
endpoint_uuid='UUID', # Please replace UUID with your uuid
endpoint_path='PATH' # Please replace PATH with your path
)],
working_dir='PATH' # Please replace PATH with your path
)
"""The following config uses threads say for local lightweight apps and IPP workers for
heavy weight applications.
The app decorator has a parameter `executors=[]` to specify the executor to which
apps should be directed.
"""
from parsl.providers import LocalProvider
from parsl.config import Config
from parsl.executors.ipp import IPyParallelExecutor
from parsl.executors.threads import ThreadPoolExecutor
config = Config(
executors=[
ThreadPoolExecutor(max_threads=4, label='local_threads'),
IPyParallelExecutor(
label='local_ipp',
engine_dir='engines',
provider=LocalProvider(
walltime="00:05:00",
nodes_per_block=1,
init_blocks=4
)
from parsl.config import Config
from parsl.data_provider.globus import GlobusScheme
from parsl.executors.threads import ThreadPoolExecutor
# This is an example config, make sure to
# replace the specific values below with the literal values
# (e.g., 'USERNAME' -> 'your_username')
config = Config(
executors=[
ThreadPoolExecutor(
label='local_threads_globus',
storage_access=[GlobusScheme(
endpoint_uuid='UUID', # Please replace UUID with your uuid
endpoint_path='PATH' # Please replace PATH with your path
)],
working_dir='PATH' # Please replace PATH with your path
)