Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _initialize_mp_context():
if WINDOWS or PYPY:
return multiprocessing
else:
method = dask.config.get("distributed.worker.multiprocessing-method")
ctx = multiprocessing.get_context(method)
# Makes the test suite much faster
preload = ["distributed"]
if "pkg_resources" in sys.modules:
preload.append("pkg_resources")
from .versions import required_packages, optional_packages
for pkg, _ in required_packages + optional_packages:
try:
importlib.import_module(pkg)
except ImportError:
pass
else:
preload.append(pkg)
ctx.set_forkserver_preload(preload)
# configuration of UCX can happen in two ways:
# 1) high level on/off flags which correspond to UCX configuration
# 2) explicitly defined UCX configuration flags
# import does not initialize ucp -- this will occur outside this function
from ucp import get_config
options = {}
# if any of the high level flags are set, as long as they are not Null/None,
# we assume we should configure basic TLS settings for UCX, otherwise we
# leave UCX to its default configuration
if any(
[
dask.config.get("ucx.tcp"),
dask.config.get("ucx.nvlink"),
dask.config.get("ucx.infiniband"),
]
):
if dask.config.get("ucx.rdmacm"):
tls = "tcp,rdmacm"
tls_priority = "rdmacm"
else:
tls = "tcp,sockcm"
tls_priority = "sockcm"
# CUDA COPY can optionally be used with ucx -- we rely on the user
# to define when messages will include CUDA objects. Note:
# defining only the Infiniband flag will not enable cuda_copy
if any([dask.config.get("ucx.nvlink"), dask.config.get("ucx.cuda_copy")]):
tls = tls + ",cuda_copy"
def __init__(self, interval, loop=None, serializers=None):
# XXX is the loop arg useful?
self.loop = loop or IOLoop.current()
self.interval = parse_timedelta(interval, default="ms")
self.waker = locks.Event()
self.stopped = locks.Event()
self.please_stop = False
self.buffer = []
self.comm = None
self.message_count = 0
self.batch_count = 0
self.byte_count = 0
self.next_deadline = None
self.recent_message_log = deque(
maxlen=dask.config.get("distributed.comm.recent-messages-log-length")
)
self.serializers = serializers
while chunk_size >= self.max_chunk_size:
if rowChunk > colChunk:
nChunks_row += 1
else:
nChunks_col += 1
rowChunk = int(np.ceil(stack.shape[0]/nChunks_row))
colChunk = int(np.ceil(stack.shape[1]/nChunks_col))
chunk_size = rowChunk*colChunk*8*1e-6 # in Mb
stack = stack.rechunk((rowChunk, colChunk))
else:
print('DASK: Chunking by columns')
# Autochunking by columns is faster for Inversions
target_size = dask.config.get('array.chunk-size').replace('MiB',' MB')
stack = stack.rechunk({0: -1, 1: 'auto'})
print('Tile size (nD, nC): ', stack.shape)
# print('Chunk sizes (nD, nC): ', stack.chunks) # For debugging only
print('Number of chunks: %.0f x %.0f = %.0f' %
(len(stack.chunks[0]), len(stack.chunks[1]), len(stack.chunks[0]) * len(stack.chunks[1])))
print("Target chunk size: %s" % target_size)
print('Max chunk size %.0f x %.0f = %.3f MB' % (max(stack.chunks[0]), max(stack.chunks[1]), max(stack.chunks[0]) * max(stack.chunks[1]) * 8*1e-6))
print('Min chunk size %.0f x %.0f = %.3f MB' % (min(stack.chunks[0]), min(stack.chunks[1]), min(stack.chunks[0]) * min(stack.chunks[1]) * 8*1e-6))
print('Max RAM (GB x %.0f CPU): %.6f' %
(self.n_cpu, max(stack.chunks[0]) * max(stack.chunks[1]) * 8*1e-9 * self.n_cpu))
print('Tile size (GB): %.3f' % (stack.shape[0] * stack.shape[1] * 8*1e-9))
if self.forward_only:
with ProgressBar():
elif scheduler.lower() in ("dask.distributed", "distributed"):
from distributed.worker import get_client
return get_client().get
else:
raise ValueError(
"Expected one of [distributed, %s]"
% ", ".join(sorted(named_schedulers))
)
# else: # try to connect to remote scheduler with this name
# return get_client(scheduler).get
if config.get("scheduler", None):
return get_scheduler(scheduler=config.get("scheduler", None))
if config.get("get", None):
raise ValueError(get_err_msg)
if getattr(thread_state, "key", False):
from distributed.worker import get_worker
return get_worker().client.get
if cls is not None:
return cls.__dask_scheduler__
if collections:
collections = [c for c in collections if c is not None]
if collections:
get = collections[0].__dask_scheduler__
if not all(c.__dask_scheduler__ == get for c in collections):
raise ValueError(
}
"""
loggers = { # default values
"distributed": "info",
"distributed.client": "warning",
"bokeh": "error",
"tornado": "critical",
"tornado.application": "error",
}
base_config = _find_logging_config(config)
loggers.update(base_config.get("logging", {}))
handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(
logging.Formatter(
dask.config.get("distributed.admin.log-format", config=config)
)
)
for name, level in loggers.items():
if isinstance(level, str):
level = logging_names[level.upper()]
logger = logging.getLogger(name)
logger.setLevel(level)
logger.handlers[:] = []
logger.addHandler(handler)
logger.propagate = False
"You must specify how much cores and memory per job you want to use, for example:\n"
"cluster = {}(cores={}, memory={!r})".format(
cluster_class_name, cores or 8, memory or "24GB"
)
)
if job_name is None:
job_name = dask.config.get("jobqueue.%s.name" % self.config_name)
if processes is None:
processes = dask.config.get("jobqueue.%s.processes" % self.config_name)
if processes is None:
processes, _ = nprocesses_nthreads(cores)
if interface is None:
interface = dask.config.get("jobqueue.%s.interface" % self.config_name)
if death_timeout is None:
death_timeout = dask.config.get(
"jobqueue.%s.death-timeout" % self.config_name
)
if local_directory is None:
local_directory = dask.config.get(
"jobqueue.%s.local-directory" % self.config_name
)
if extra is None:
extra = dask.config.get("jobqueue.%s.extra" % self.config_name)
if env_extra is None:
env_extra = dask.config.get("jobqueue.%s.env-extra" % self.config_name)
if header_skip is None:
header_skip = dask.config.get(
"jobqueue.%s.header-skip" % self.config_name, ()
)
if log_directory is None:
log_directory = dask.config.get(
super().__init__(
scheduler=scheduler, name=name, config_name=config_name, **base_class_kwargs
)
if queue is None:
queue = dask.config.get("jobqueue.%s.queue" % self.config_name)
if project is None:
project = dask.config.get("jobqueue.%s.project" % self.config_name)
if ncpus is None:
ncpus = dask.config.get("jobqueue.%s.ncpus" % self.config_name)
if mem is None:
mem = dask.config.get("jobqueue.%s.mem" % self.config_name)
if walltime is None:
walltime = dask.config.get("jobqueue.%s.walltime" % self.config_name)
if job_extra is None:
job_extra = dask.config.get("jobqueue.%s.job-extra" % self.config_name)
if lsf_units is None:
lsf_units = dask.config.get("jobqueue.%s.lsf-units" % self.config_name)
if use_stdin is None:
use_stdin = dask.config.get("jobqueue.%s.use-stdin" % self.config_name)
self.use_stdin = use_stdin
header_lines = []
# LSF header build
if self.name is not None:
header_lines.append("#BSUB -J %s" % self.job_name)
if self.log_directory is not None:
header_lines.append(
"#BSUB -e %s/%s-%%J.err" % (self.log_directory, self.name or "worker")
)
header_lines.append(
restart_policy='Never',
volumes=spec.volumes,
service_account=spec.service_account,
)
if spec.image_pull_secret:
pod_spec.image_pull_secrets = [
client.V1LocalObjectReference(name=spec.image_pull_secret)
]
pod = client.V1Pod(
metadata=client.V1ObjectMeta(namespace=namespace, labels=pod_labels),
# annotations=meta.annotation),
spec=pod_spec,
)
svc_temp = dask.config.get("kubernetes.scheduler-service-template")
if spec.service_type or spec.node_port:
if spec.node_port:
spec.service_type = 'NodePort'
svc_temp['spec']['ports'][1]['nodePort'] = spec.node_port
update_in(svc_temp, 'spec.type', spec.service_type)
norm_name = normalize_name(meta.name)
dask.config.set(
{
"kubernetes.scheduler-service-template": svc_temp,
'kubernetes.name': 'mlrun-' + norm_name + '-{uuid}',
}
)
cluster = KubeCluster(
pod,