Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
enable_proctitle_on_current()
enable_proctitle_on_children()
if sys.platform.startswith('linux'):
import resource # module fails importing on Windows
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
limit = max(soft, hard // 2)
resource.setrlimit(resource.RLIMIT_NOFILE, (limit, hard))
addr = uri_from_host_port('', None, 0)
loop = IOLoop.current()
services = {}
bokeh = False
with ignoring(ImportError):
from distributed.bokeh.scheduler import BokehScheduler
services[('bokeh', 0)] = (BokehScheduler, {})
bokeh = True
scheduler = Scheduler(loop=loop, services=services)
scheduler.start(addr)
install_signal_handlers(loop)
app_client.kv['dask.scheduler'] = scheduler.address.encode()
if bokeh:
bokeh_port = scheduler.services['bokeh'].port
bokeh_host = urlparse(scheduler.address).hostname
bokeh_address = 'http://%s:%d' % (bokeh_host, bokeh_port)
def _gather(self, futures, errors='raise'):
futures2, keys = unpack_remotedata(futures, byte_keys=True)
keys = [tokey(key) for key in keys]
bad_data = dict()
@gen.coroutine
def wait(k):
""" Want to stop the All(...) early if we find an error """
yield self.futures[k]['event'].wait()
if self.futures[k]['status'] != 'finished':
raise Exception()
while True:
logger.debug("Waiting on futures to clear before gather")
with ignoring(Exception):
yield All([wait(key) for key in keys if key in self.futures])
exceptions = set()
bad_keys = set()
for key in keys:
if (key not in self.futures or
self.futures[key]['status'] == 'error'):
exceptions.add(key)
if errors == 'raise':
try:
d = self.futures[key]
six.reraise(type(d['exception']),
d['exception'],
d['traceback'])
except KeyError:
six.reraise(CancelledError,
self.futures[msg['key']]['status'] = 'error'
try:
self.futures[msg['key']]['exception'] = loads(msg['exception'])
except TypeError:
self.futures[msg['key']]['exception'] = \
Exception('Undeserializable exception', msg['exception'])
self.futures[msg['key']]['traceback'] = (loads(msg['traceback'])
if msg['traceback'] else None)
self.futures[msg['key']]['event'].set()
elif msg['op'] == 'restart':
logger.info("Receive restart signal from scheduler")
events = [d['event'] for d in self.futures.values()]
self.futures.clear()
for e in events:
e.set()
with ignoring(AttributeError):
self._restart_event.set()
elif 'error' in msg['op']:
logger.warn("Scheduler exception:")
logger.exception(msg['exception'])
if breakout:
break
"""
if timeout == no_default:
timeout = self._timeout * 2
# XXX handling of self.status here is not thread-safe
if self.status == 'closed':
return
self.status = 'closing'
if self.asynchronous:
future = self._close()
if timeout:
future = gen.with_timeout(timedelta(seconds=timeout), future)
return future
if self._start_arg is None:
with ignoring(AttributeError):
self.cluster.close()
sync(self.loop, self._close, fast=True)
assert self.status == 'closed'
if self._should_close_loop:
self._loop_runner.stop()
with ignoring(AttributeError):
dask.set_options(get=self._previous_get)
with ignoring(AttributeError):
dask.set_options(shuffle=self._previous_shuffle)
if self.get == _globals.get('get'):
del _globals['get']
return future
if self._start_arg is None:
with ignoring(AttributeError):
self.cluster.close()
sync(self.loop, self._close, fast=True)
assert self.status == 'closed'
if self._should_close_loop:
self._loop_runner.stop()
with ignoring(AttributeError):
dask.set_options(get=self._previous_get)
with ignoring(AttributeError):
dask.set_options(shuffle=self._previous_shuffle)
if self.get == _globals.get('get'):
del _globals['get']
import pandas.msgpack as msgpack
except ImportError:
import msgpack
from toolz import identity, get_in
from .utils import ignoring
compressions = {None: {'compress': identity,
'decompress': identity}}
default_compression = None
with ignoring(ImportError):
import zlib
compressions['zlib'] = {'compress': zlib.compress,
'decompress': zlib.decompress}
with ignoring(ImportError):
import snappy
compressions['snappy'] = {'compress': snappy.compress,
'decompress': snappy.decompress}
default_compression = 'snappy'
with ignoring(ImportError):
import lz4
compressions['lz4'] = {'compress': lz4.LZ4_compress,
'decompress': lz4.LZ4_uncompress}
default_compression = 'lz4'
def remove_client(self, client=None):
""" Remove client from network """
logger.info("Remove client %s", client)
self.log_event(['all', client], {'action': 'remove-client',
'client': client})
self.client_releases_keys(self.wants_what.get(client, ()), client)
with ignoring(KeyError):
del self.wants_what[client]
name='%s.%s' % (JOB_ID, TASK_ID),
preexec=preexec_commands)
with open(fn, 'wt') as f:
f.write(script_contents)
@atexit.register
def remove_script():
if os.path.exists(fn):
os.remove(fn)
os.chmod(self.script, 0o777)
else:
self._should_cleanup_script = False
if copy_script:
with ignoring(EnvironmentError): # may be in the same path
shutil.copy(script, os.path.curdir) # python 2.x returns None
script = os.path.join(os.path.curdir, os.path.basename(script))
self._should_cleanup_script = True
self.script = os.path.abspath(script)
assert not preexec_commands, "Cannot specify both script and preexec_commands"
# TODO: check that user-provided script is executable
self.template = merge(default_template,
{'remoteCommand': self.script},
template or {})
self._cleanup_callback = PeriodicCallback(callback=self.cleanup_closed_workers,
callback_time=cleanup_interval,
io_loop=self.scheduler.loop)
self._cleanup_callback.start()
def _close(self, fast=False):
""" Send close signal and wait until scheduler completes """
self.status = 'closing'
with log_errors():
for pc in self._periodic_callbacks.values():
pc.stop()
self._scheduler_identity = {}
with ignoring(AttributeError):
dask.set_options(get=self._previous_get)
with ignoring(AttributeError):
dask.set_options(shuffle=self._previous_shuffle)
if self.get == _globals.get('get'):
del _globals['get']
if self.status == 'closed':
raise gen.Return()
if self.scheduler_comm and self.scheduler_comm.comm and not self.scheduler_comm.comm.closed():
self._send_to_scheduler({'op': 'close-stream'})
yield self.scheduler_comm.close()
for key in list(self.futures):
self._release_key(key=key)
if self._start_arg is None:
with ignoring(AttributeError):
yield self.cluster._close()
self.status = 'closed'
----------
minimum_cores: int
Minimum number of cores for the cluster
maximum_cores: int
Maximum number of cores for the cluster
minimum_memory: str
Minimum amount of memory for the cluster
maximum_memory: str
Maximum amount of memory for the cluster
Examples
--------
>>> cluster.adapt(minimum=0, maximum=10, interval='500ms')
>>> cluster.adapt(minimum_cores=24, maximum_cores=96)
>>> cluster.adapt(minimum_memory='60 GB', maximum_memory= '1 TB')
"""
with ignoring(AttributeError):
self._adaptive.stop()
if not hasattr(self, "_adaptive_options"):
self._adaptive_options = {}
if "minimum" not in kwargs:
if minimum_cores is not None:
kwargs["minimum"] = self._get_nb_workers_from_cores(minimum_cores)
elif minimum_memory is not None:
kwargs["minimum"] = self._get_nb_workers_from_memory(minimum_memory)
if "maximum" not in kwargs:
if maximum_cores is not None:
kwargs["maximum"] = self._get_nb_workers_from_cores(maximum_cores)
elif maximum_memory is not None:
kwargs["maximum"] = self._get_nb_workers_from_memory(maximum_memory)
self._adaptive_options.update(kwargs)
try:
self._adaptive = Adaptive(self.scheduler, self, **self._adaptive_options)