Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert opts.option_one == 2
assert opts.option_two == "small"
# Without local config, uses server-side defaults
opts = await gateway.cluster_options(use_local_defaults=False)
assert opts.option_one == 1
assert opts.option_two == "small"
with dask.config.set(
gateway__cluster__options={"option_two": "{TEST_OPTION_TWO}"}
):
# Values can format from environment variables
opts = await gateway.cluster_options()
assert opts.option_one == 1
assert opts.option_two == "large"
with dask.config.set(
gateway__cluster__options={
"option_two": "{TEST_OPTION_TWO}",
"option_one": 3,
}
):
# Defaults are merged with kwargs to new_cluster
async with gateway.new_cluster(option_one=2) as cluster:
report = await gateway.get_cluster(cluster.name)
assert report.options == {"option_one": 2, "option_two": "large"}
# If passing `cluster_options`, defaults are assumed already applied
opts = await gateway.cluster_options()
opts.option_two = "small"
async with gateway.new_cluster(opts) as cluster:
report = await gateway.get_cluster(cluster.name)
assert report.options == {"option_one": 3, "option_two": "small"}
def _set_field(self, kwargs, field, config_name):
if field in kwargs:
out = kwargs[field]
else:
out = dask.config.get(config_name)
setattr(self, field, out)
async def connect(addr, timeout=None, deserialize=True, **connection_args):
"""
Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
and yield a ``Comm`` object. If the connection attempt fails, it is
retried until the *timeout* is expired.
"""
if timeout is None:
timeout = dask.config.get("distributed.comm.timeouts.connect")
timeout = parse_timedelta(timeout, default="seconds")
scheme, loc = parse_address(addr)
backend = registry.get_backend(scheme)
connector = backend.get_connector()
comm = None
start = time()
deadline = start + timeout
error = None
def _raise(error):
error = error or "connect() didn't finish in time"
msg = "Timed out trying to connect to %r after %s s: %s" % (
addr,
timeout,
python=sys.executable,
job_name=None,
config_name=None,
):
self.scheduler = scheduler
self.job_id = None
super().__init__()
default_config_name = self.default_config_name()
if config_name is None:
config_name = default_config_name
self.config_name = config_name
if cores is None:
cores = dask.config.get("jobqueue.%s.cores" % self.config_name)
if memory is None:
memory = dask.config.get("jobqueue.%s.memory" % self.config_name)
if cores is None or memory is None:
job_class_name = self.__class__.__name__
cluster_class_name = job_class_name.replace("Job", "Cluster")
raise ValueError(
"You must specify how much cores and memory per job you want to use, for example:\n"
"cluster = {}(cores={}, memory={!r})".format(
cluster_class_name, cores or 8, memory or "24GB"
)
)
if job_name is None:
job_name = dask.config.get("jobqueue.%s.name" % self.config_name)
if processes is None:
from __future__ import print_function, division, absolute_import
import os
import dask
import yaml
fn = os.path.join(os.path.dirname(__file__), "labextension.yaml")
dask.config.ensure_file(source=fn)
with open(fn) as f:
defaults = yaml.safe_load(f)
dask.config.update_defaults(defaults)
**base_class_kwargs
):
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)
if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if project is None:
project = dask.config.get("jobqueue.%s.project" % self.config_name)
if resource_spec is None:
resource_spec = dask.config.get(
"jobqueue.%s.resource-spec" % self.config_name
)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)
header_lines = []
if self.job_name is not None:
header_lines.append("#OAR -n %s" % self.job_name)
if queue is not None:
header_lines.append("#OAR -q %s" % queue)
if project is not None:
header_lines.append("#OAR --project %s" % project)
# OAR needs to have the resource on a single line otherwise it is
# considered as a "moldable job" (i.e. the scheduler can chose between
# multiple sets of resources constraints)
resource_spec_list = []
if resource_spec is None:
# we guarantee partition order is preserved when its saved and read
# so we enforce name_function to maintain the order of its input.
if not (single_file and single_node):
formatted_names = [name_function(i) for i in range(df.npartitions)]
if formatted_names != sorted(formatted_names):
warn(
"To preserve order between partitions name_function "
"must preserve the order of its input"
)
# If user did not specify scheduler and write is sequential default to the
# sequential scheduler. otherwise let the _get method choose the scheduler
if (
scheduler is None
and not config.get("scheduler", None)
and single_node
and single_file
):
scheduler = "single-threaded"
# handle lock default based on whether we're writing to a single entity
_actual_get = get_scheduler(collections=[df], scheduler=scheduler)
if lock is None:
if not single_node:
lock = True
elif not single_file and _actual_get is not multiprocessing.get:
# if we're writing to multiple files with the multiprocessing
# scheduler we don't need to lock
lock = True
else:
lock = False
scheduler=None,
name=None,
queue=None,
project=None,
resource_spec=None,
walltime=None,
job_extra=None,
config_name=None,
**base_class_kwargs
):
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)
if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if resource_spec is None:
resource_spec = dask.config.get(
"jobqueue.%s.resource-spec" % self.config_name
)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)
if project is None:
project = dask.config.get(
"jobqueue.%s.project" % self.config_name
) or os.environ.get("PBS_ACCOUNT")
# Try to find a project name from environment variable
project = project or os.environ.get("PBS_ACCOUNT")
def setup_dask(num_workers):
dask.config.set(scheduler='multiprocessing')
dask.config.set(pool=ThreadPool(num_workers))
import dask
from .comm.addressing import get_address_host
from .core import CommClosedError
from .diagnostics.plugin import SchedulerPlugin
from .utils import log_errors, parse_timedelta
from tlz import topk
LATENCY = 10e-3
log_2 = log(2)
logger = logging.getLogger(__name__)
LOG_PDB = dask.config.get("distributed.admin.pdb-on-err")
class WorkStealing(SchedulerPlugin):
def __init__(self, scheduler):
self.scheduler = scheduler
# { level: { task states } }
self.stealable_all = [set() for i in range(15)]
# { worker: { level: { task states } } }
self.stealable = dict()
# { task state: (worker, level) }
self.key_stealable = dict()
self.cost_multipliers = [1 + 2 ** (i - 6) for i in range(15)]
self.cost_multipliers[0] = 1
for worker in scheduler.workers: