Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update(self, state, metadata=None):
with log_errors():
self.state = state
data = profile.plot_data(self.state, profile_interval)
self.states = data.pop("states")
update(self.source, data)
if metadata is not None and metadata["counts"]:
self.task_names = ["All"] + sorted(metadata["keys"])
self.select.options = self.task_names
if self.key:
ts = metadata["keys"][self.key]
else:
ts = metadata["counts"]
times, counts = zip(*ts)
self.ts = {"count": counts, "time": [t * 1000 for t in times]}
self.ts_source.data.update(self.ts)
def deserialize_cudf_dataframe(header, frames):
with log_errors():
cudf_typ = pickle.loads(header["type"])
cudf_obj = cudf_typ.deserialize(header, frames)
return cudf_obj
def scale_cb(b):
with log_errors():
n = request.value
with suppress(AttributeError):
self._adaptive.stop()
self.scale(n)
update()
async def cb():
with log_errors():
prof = await self.server.get_profile(
key=self.key, start=self.start, stop=self.stop
)
if update_metadata:
metadata = await self.server.get_profile_metadata()
else:
metadata = None
if isinstance(prof, gen.Future):
prof, metadata = await asyncio.gather(prof, metadata)
self.doc().add_next_tick_callback(lambda: self.update(prof, metadata))
def crossfilter_doc(worker, extra, doc):
with log_errors():
statetable = StateTable(worker)
crossfilter = CrossFilter(worker)
doc.title = "Dask Worker Cross-filter"
add_periodic_callback(doc, statetable, 500)
add_periodic_callback(doc, crossfilter, 500)
doc.add_root(column(statetable.root, crossfilter.root))
doc.template = env.get_template("simple.html")
doc.template_variables["active_page"] = "crossfilter"
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def deserialize_numpy_ndarray(header, frames):
with log_errors():
if header.get("pickle"):
return pickle.loads(frames[0], buffers=frames[1:])
(frame,) = frames
is_custom, dt = header["dtype"]
if is_custom:
dt = pickle.loads(dt)
else:
dt = np.dtype(dt)
if header.get("broadcast_to"):
shape = header["broadcast_to"]
else:
shape = header["shape"]
def processing_update(msg):
with log_errors():
names = sorted(msg["processing"])
names = sorted(names)
processing = msg["processing"]
processing = [processing[name] for name in names]
nthreads = msg["nthreads"]
nthreads = [nthreads[name] for name in names]
n = len(names)
d = {
"name": list(names),
"processing": processing,
"right": list(processing),
"top": list(range(n, 0, -1)),
"bottom": list(range(n - 1, -1, -1)),
"nthreads": nthreads,
}
def tasks_doc(scheduler, extra, doc):
with log_errors():
ts = TaskStream(
scheduler,
n_rectangles=dask.config.get(
"distributed.scheduler.dashboard.tasks.task-stream-length"
),
clear_interval="60s",
sizing_mode="stretch_both",
)
ts.update()
add_periodic_callback(doc, ts, 5000)
doc.title = "Dask: Task Stream"
doc.add_root(ts.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME
def create(self, key, k):
with log_errors():
ts = self.scheduler.tasks[key]
g = {"memory": 0, "erred": 0, "waiting": 0, "released": 0, "processing": 0}
self.keys[k] = set()
self.groups[k] = g
self.nbytes[k] = 0
self.durations[k] = 0
self.dependents[k] = {key_split_group(dts.key) for dts in ts.dependents}
for dts in ts.dependencies:
d = key_split_group(dts.key)
self.dependents[d].add(k)
self.dependencies[k].add(d)
our data, with a comfortable buffer.
This is for use with systems like ``distributed.deploy.adaptive``.
Parameters
----------
memory_factor: Number
Amount of extra space we want to have for our stored data.
Defaults two 2, or that we want to have twice as much memory as we
currently have data.
Returns
-------
to_close: list of workers that are OK to close
"""
with log_errors():
if all(self.processing.values()):
return []
limit_bytes = {w: self.worker_info[w]['memory_limit']
for w in self.worker_info}
worker_bytes = self.worker_bytes
limit = sum(limit_bytes.values())
total = sum(worker_bytes.values())
idle = sorted(self.idle, key=worker_bytes.get, reverse=True)
to_close = []
while idle:
w = idle.pop()
limit -= limit_bytes[w]