Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import paramiko
from parsl.channels.base import Channel
from parsl.channels.errors import BadHostKeyException, AuthException, SSHException, BadScriptPath, BadPermsScriptPath, FileCopyException, FileExists
from parsl.utils import RepresentationMixin
logger = logging.getLogger(__name__)
class NoAuthSSHClient(paramiko.SSHClient):
def _auth(self, username, *args):
self._transport.auth_none(username)
return
class SSHChannel(Channel, RepresentationMixin):
''' SSH persistent channel. This enables remote execution on sites
accessible via ssh. It is assumed that the user has setup host keys
so as to ssh to the remote host. Which goes to say that the following
test on the commandline should work:
>>> ssh @
'''
def __init__(self, hostname, username=None, password=None, script_dir=None, envs=None, gssapi_auth=False, skip_auth=False, port=22, **kwargs):
''' Initialize a persistent connection to the remote system.
We should know at this point whether ssh connectivity is possible
Args:
- hostname (String) : Hostname
host='http://localhost',
port=8899):
"""
Parameters
----------
host : str
The hostname for running the visualization interface.
port : int
The port for the visualization interface
"""
self.host = host
self.port = port
class Monitoring(RepresentationMixin):
""" This is a config class for monitoring. """
def __init__(self,
store=None,
visualization_server=None,
monitoring_interval=15,
workflow_name=None,
version='1.0.0'):
""" Initializes a monitoring configuration class.
Parameters
----------
monitoring_interval : float, optional
The amount of time in seconds to sleep in between resource monitoring logs per task.
workflow_name : str, optional
Name to record as the workflow base name, defaults to the name of the parsl script file if left as None.
'F': 'FAILED', # (failed),
'TO': 'TIMEOUT', # (timeout),
'NF': 'FAILED', # (node failure),
'RV': 'FAILED', # (revoked) and
'SE': 'FAILED'
} # (special exit state
template_string = """
cd ~
sudo apt-get update -y
sudo apt-get install -y python3 python3-pip ipython
sudo pip3 install ipyparallel parsl
"""
class AzureProvider(ExecutionProvider, RepresentationMixin):
"""A provider for using Azure resources.
Parameters
----------
profile : str
Profile to be used if different from the standard Azure config file ~/.azure/config.
template_file : str
Location of template file for Azure instance. Default is 'templates/template.json'.
walltime : str
Walltime requested per block in HH:MM:SS.
azure_template_file : str
Path to the template file for the Azure instance.
init_blocks : int
Number of blocks to provision at the start of the run. Default is 1.
min_blocks : int
Minimum number of blocks to maintain. Default is 0.
except Exception as e:
print("Exception during pickling {}".format(e))
return
try:
x = self.sock.sendto(buffer, (self.ip, self.port))
except socket.timeout:
print("Could not send message within timeout limit")
return False
return x
def __del__(self):
self.sock.close()
class MonitoringHub(RepresentationMixin):
def __init__(self,
hub_address,
hub_port=None,
hub_port_range=(55050, 56000),
client_address="127.0.0.1",
client_port_range=(55000, 56000),
workflow_name=None,
workflow_version=None,
logging_endpoint='sqlite:///monitoring.db',
logdir=None,
logging_level=logging.INFO,
resource_monitoring_enabled=True,
resource_monitoring_interval=30): # in seconds
"""
host='http://localhost',
port=8899):
"""
Parameters
----------
host : str
The hostname for running the visualization interface.
port : int
The port for the visualization interface
"""
self.host = host
self.port = port
class Monitoring(RepresentationMixin):
""" This is a config class for monitoring. """
def __init__(self,
store=None,
visualization_server=None,
monitoring_interval=15,
workflow_name=None,
version='1.0.0'):
""" Initializes a monitoring configuration class.
Parameters
----------
monitoring_interval : float, optional
The amount of time in seconds to sleep in between resource monitoring logs per task.
workflow_name : str, optional
Name to record as the workflow base name, defaults to the name of the parsl script file if left as None.
import logging
import sys
import concurrent.futures as cf
from parsl.executors.base import ParslExecutor
from parsl.dataflow.error import ConfigurationError
from parsl.utils import RepresentationMixin
logger = logging.getLogger(__name__)
class ThreadPoolExecutor(ParslExecutor, RepresentationMixin):
"""A thread-based executor.
Parameters
----------
max_threads : int
Number of threads. Default is 2.
thread_name_prefix : string
Thread name prefix (only supported in python v3.6+).
storage_access : list of :class:`~parsl.data_provider.scheme.Scheme`
Specifications for accessing data this executor remotely. Multiple `Scheme`s are not yet supported.
managed : bool
If True, parsl will control dynamic scaling of this executor, and be responsible. Otherwise,
this is managed by the user.
"""
def __init__(self, label='threads', max_threads=2, thread_name_prefix='', storage_access=None, working_dir=None, managed=True):
def can_stage_in(self, file):
logger.debug("FTPSeparateTaskStaging checking file {}".format(repr(file)))
return file.scheme == 'ftp'
def stage_in(self, dm, executor, file, parent_fut):
working_dir = dm.dfk.executors[executor].working_dir
if working_dir:
file.local_path = os.path.join(working_dir, file.filename)
else:
file.local_path = file.filename
stage_in_app = _ftp_stage_in_app(dm, executor=executor)
app_fut = stage_in_app(working_dir, outputs=[file], staging_inhibit_output=True, parent_fut=parent_fut)
return app_fut._outputs[0]
class FTPInTaskStaging(Staging, RepresentationMixin):
"""Performs FTP staging as a wrapper around the application task."""
def can_stage_in(self, file):
logger.debug("FTPInTaskStaging checking file {}".format(file.__repr__()))
return file.scheme == 'ftp'
def stage_in(self, dm, executor, file, parent_fut):
working_dir = dm.dfk.executors[executor].working_dir
if working_dir:
file.local_path = os.path.join(working_dir, file.filename)
else:
file.local_path = file.filename
return file
def replace_task(self, dm, executor, file, f):
import logging
import os
import signal
import time
from parsl.channels import LocalChannel
from parsl.launchers import SingleNodeLauncher
from parsl.providers.provider_base import ExecutionProvider
from parsl.providers.error import SchedulerMissingArgs, ScriptPathError
from parsl.utils import RepresentationMixin
logger = logging.getLogger(__name__)
class LocalProvider(ExecutionProvider, RepresentationMixin):
""" Local Execution Provider
This provider is used to provide execution resources from the localhost.
Parameters
----------
min_blocks : int
Minimum number of blocks to maintain.
max_blocks : int
Maximum number of blocks to maintain.
parallelism : float
Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive
scaling where as many resources as possible are used; parallelism close to 0 represents
the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
move_files : Optional[Bool]: should files be moved? by default, Parsl will try to figure