Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __str__(self):
if hasattr(self, '_loop_thread'):
n = sync(self.loop, self.scheduler.ncores)
return '' % (
self.scheduler.ip, self.scheduler.port, len(n),
sum(n.values()))
else:
return '' % (
self.scheduler.ip, self.scheduler.port)
def shutdown(self, timeout=10):
""" Send shutdown signal and wait until scheduler terminates """
if self.status == 'closed':
return
self.status = 'closed'
with ignoring(AttributeError):
self.loop.add_callback(self.scheduler_stream.send,
{'op': 'close-stream'})
sync(self.loop, self.scheduler_stream.close)
with ignoring(AttributeError):
self.scheduler.close_rpc()
if self._should_close_loop:
sync(self.loop, self.loop.stop)
self.loop.close(all_fds=True)
self._loop_thread.join(timeout=timeout)
with ignoring(AttributeError):
dask.set_options(get=self._previous_get)
with ignoring(AttributeError):
dask.set_options(shuffle=self._previous_shuffle)
if _global_executor[0] is self:
_global_executor[0] = None
if self.get == _globals.get('get'):
del _globals['get']
with ignoring(AttributeError):
self.cluster.close()
""" Basic information about the workers in the cluster
Examples
--------
>>> e.scheduler_info() # doctest: +SKIP
{'id': '2de2b6da-69ee-11e6-ab6a-e82aea155996',
'services': {},
'type': 'Scheduler',
'workers': {'127.0.0.1:40575': {'active': 0,
'last-seen': 1472038237.4845693,
'name': '127.0.0.1:40575',
'services': {},
'stored': 0,
'time-delay': 0.0061032772064208984}}}
"""
return sync(self.loop, self.scheduler.identity)
{'127.0.0.1:3595': {'inc-1', 'inc-2'},
'127.0.0.1:53589': {'inc-2', 'add-5'}}
>>> e.start_ipython_scheduler(qtconsole=True) # doctest: +SKIP
Returns
-------
connection_info: dict
connection_info dict containing info necessary
to connect Jupyter clients to the scheduler.
See Also
--------
Executor.start_ipython_workers: Start IPython on the workers
"""
info = sync(self.loop, self.scheduler.start_ipython)
if magic_name == 'scheduler_if_ipython':
# default to %scheduler if in IPython, no magic otherwise
in_ipython = False
if 'IPython' in sys.modules:
from IPython import get_ipython
in_ipython = bool(get_ipython())
if in_ipython:
magic_name = 'scheduler'
else:
magic_name = None
if magic_name:
from ._ipython_utils import register_worker_magic
register_worker_magic(info, magic_name)
if qtconsole:
from ._ipython_utils import connect_qtconsole
connect_qtconsole(info, name='dask-scheduler',
def start(self, **kwargs):
""" Start scheduler running in separate thread """
if hasattr(self, '_loop_thread'):
return
if not self.loop._running:
from threading import Thread
self._loop_thread = Thread(target=self.loop.start)
self._loop_thread.daemon = True
self._loop_thread.start()
while not self.loop._running:
sleep(0.001)
pc = PeriodicCallback(lambda: None, 1000, io_loop=self.loop)
self.loop.add_callback(pc.start)
_global_executor[0] = self
sync(self.loop, self._start, **kwargs)
self.status = 'running'
See Also
--------
Executor.scatter: Send data out to cluster
"""
if isqueue(futures):
qout = pyQueue(maxsize=maxsize)
t = Thread(target=self._threaded_gather, args=(futures, qout),
kwargs={'errors': errors})
t.daemon = True
t.start()
return qout
elif isinstance(futures, Iterator):
return (self.gather(f, errors=errors) for f in futures)
else:
return sync(self.loop, self._gather, futures, errors=errors)
>>> e.list_datasets() # doctest: +SKIP
['my_dataset']
>>> df2 = e.get_dataset('my_dataset') # doctest: +SKIP
Returns
-------
None
See Also
--------
Executor.list_datasets
Executor.get_dataset
Executor.unpublish_dataset
Executor.persist
"""
return sync(self.loop, self._publish_dataset, **kwargs)
>>> e.who_has([x, y]) # doctest: +SKIP
{'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'],
'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']}
See Also
--------
Executor.has_what
Executor.ncores
"""
if futures is not None:
futures = self.futures_of(futures)
keys = list({f.key for f in futures})
else:
keys = None
return sync(self.loop, self.scheduler.who_has, keys=keys)
* Serve as individual chunks of the full dask array
* Have shapes consistent with how dask.array sets up chunks
(all dimensions consistent across any particular direction)
This function will block until all futures are accessible. It queries the
futures directly for shape and dtype information.
Parameters
----------
futures: iterable of Futures
Futures that create arrays
client: Client (optional)
Client through which we access the remote dataframes
"""
client = default_client(client)
return sync(client.loop, _futures_to_dask_array, futures,
client=client)