Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"scratch data to a local disk.",
):
self._workspace = WorkSpace(os.path.abspath(local_directory))
self._workdir = self._workspace.new_work_dir(prefix="worker-")
self.local_directory = self._workdir.dir_path
if preload is None:
preload = dask.config.get("distributed.worker.preload")
if preload_argv is None:
preload_argv = dask.config.get("distributed.worker.preload-argv")
self.preloads = preloading.process_preloads(
self, preload, preload_argv, file_dir=self.local_directory
)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("worker")
self.memory_limit = parse_memory_limit(memory_limit, self.nthreads)
self.paused = False
if "memory_target_fraction" in kwargs:
self.memory_target_fraction = kwargs.pop("memory_target_fraction")
else:
self.memory_target_fraction = dask.config.get(
"distributed.worker.memory.target"
)
if "memory_spill_fraction" in kwargs:
self.memory_spill_fraction = kwargs.pop("memory_spill_fraction")
):
self._workspace = WorkSpace(os.path.abspath(local_directory))
self._workdir = self._workspace.new_work_dir(prefix="worker-")
self.local_directory = self._workdir.dir_path
if preload is None:
preload = dask.config.get("distributed.worker.preload")
if preload_argv is None:
preload_argv = dask.config.get("distributed.worker.preload-argv")
self.preloads = preloading.process_preloads(
self, preload, preload_argv, file_dir=self.local_directory
)
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("worker")
self.memory_limit = parse_memory_limit(memory_limit, self.nthreads)
self.paused = False
if "memory_target_fraction" in kwargs:
self.memory_target_fraction = kwargs.pop("memory_target_fraction")
else:
self.memory_target_fraction = dask.config.get(
"distributed.worker.memory.target"
)
if "memory_spill_fraction" in kwargs:
self.memory_spill_fraction = kwargs.pop("memory_spill_fraction")
else:
contact_address=None,
listen_address=None,
worker_class=None,
env=None,
interface=None,
host=None,
port=None,
protocol=None,
config=None,
**worker_kwargs,
):
self._setup_logging(logger)
self.loop = loop or IOLoop.current()
if isinstance(security, dict):
security = Security(**security)
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args("worker")
if scheduler_file:
cfg = json_load_robust(scheduler_file)
self.scheduler_addr = cfg["address"]
elif scheduler_ip is None and dask.config.get("scheduler-address"):
self.scheduler_addr = dask.config.get("scheduler-address")
elif scheduler_port is None:
self.scheduler_addr = coerce_to_address(scheduler_ip)
else:
self.scheduler_addr = coerce_to_address((scheduler_ip, scheduler_port))
if protocol is None:
protocol_address = self.scheduler_addr.split("://")
# warnings.warn("The ip keyword has been moved to host")
host = ip
if diagnostics_port is not None:
warnings.warn(
"diagnostics_port has been deprecated. "
"Please use `dashboard_address=` instead"
)
dashboard_address = diagnostics_port
self.status = None
self.processes = processes
if security is None:
# Falsey values load the default configuration
security = Security()
elif security is True:
# True indicates self-signed temporary credentials should be used
security = Security.temporary()
elif not isinstance(security, Security):
raise TypeError("security must be a Security object")
if protocol is None:
if host and "://" in host:
protocol = host.split("://")[0]
elif security and security.require_encryption:
protocol = "tls://"
elif not self.processes and not scheduler_port:
protocol = "inproc://"
else:
protocol = "tcp://"
if not protocol.endswith("://"):
scheduler_file=None,
security=None,
**kwargs):
# Attributes
self.allowed_failures = allowed_failures
self.validate = validate
self.status = None
self.delete_interval = delete_interval
self.synchronize_worker_interval = synchronize_worker_interval
self.digests = None
self.service_specs = services or {}
self.services = {}
self.scheduler_file = scheduler_file
self.security = security or Security()
assert isinstance(self.security, Security)
self.connection_args = self.security.get_connection_args('scheduler')
self.listen_args = self.security.get_listen_args('scheduler')
# Communication state
self.loop = loop or IOLoop.current()
self.worker_comms = dict()
self.comms = dict()
self.coroutines = []
self._worker_coroutines = []
self._ipython_kernel = None
# Task state
self.tasks = dict()
self.task_state = dict()
self.dependencies = dict()
def __init__(
self,
ip=None,
local_dir=tempfile.mkdtemp(prefix="client-"),
loop=None,
security=None,
**kwargs
):
self.ip = ip or get_ip()
self.loop = loop or IOLoop.current()
self.local_dir = local_dir
handlers = {"upload_file": self.upload_file, "execute": self.execute}
self.security = security or Security()
assert isinstance(self.security, Security)
self.listen_args = self.security.get_listen_args("scheduler")
super(RemoteClient, self).__init__(handlers, io_loop=self.loop, **kwargs)
def make_security(tls_cert=None, tls_key=None):
tls_cert = tls_cert or getenv("DASK_GATEWAY_TLS_CERT")
tls_key = tls_key or getenv("DASK_GATEWAY_TLS_KEY")
return Security(
require_encryption=True,
tls_ca_file=tls_cert,
tls_scheduler_cert=tls_cert,
tls_scheduler_key=tls_key,
tls_worker_cert=tls_cert,
tls_worker_key=tls_key,
)
interface,
death_timeout,
preload,
dashboard_prefix,
tls_ca_file,
tls_cert,
tls_key,
enable_tcp_over_ucx,
enable_infiniband,
enable_nvlink,
enable_rdmacm,
net_devices,
**kwargs,
):
if tls_ca_file and tls_cert and tls_key:
security = Security(
tls_ca_file=tls_ca_file, tls_worker_cert=tls_cert, tls_worker_key=tls_key,
)
else:
security = None
worker = CUDAWorker(
scheduler,
host,
nthreads,
name,
memory_limit,
device_memory_limit,
rmm_pool_size,
pid_file,
resources,
dashboard,
class GatewayClusterError(Exception):
"""Exception related to starting/stopping/scaling of a gateway cluster"""
class GatewayServerError(Exception):
"""Exception related to the operation of the gateway server.
Indicates an internal error in the gateway server.
"""
class GatewayWarning(UserWarning):
"""Warnings raised by the Gateway client"""
class GatewaySecurity(Security):
"""A security implementation that temporarily stores credentials on disk.
The normal ``Security`` class assumes credentials already exist on disk,
but our credentials exist only in memory. Since Python's SSLContext doesn't
support directly loading credentials from memory, we write them temporarily
to disk when creating the context, then delete them immediately."""
def __init__(self, tls_key, tls_cert):
self.tls_key = tls_key
self.tls_cert = tls_cert
def __repr__(self):
return "GatewaySecurity<...>"
def get_connection_args(self, role):
with tempfile.TemporaryDirectory() as tempdir: