Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def bisect(target, left, right):
if left == right:
return left
mid = (left + right) // 2
value = max(
startstop["stop"] for startstop in self.buffer[mid]["startstops"]
)
if value < target:
return bisect(target, mid + 1, right)
else:
return bisect(target, left, mid)
if isinstance(start, str):
start = time() - parse_timedelta(start)
if start is not None:
start = bisect(start, 0, len(self.buffer))
if isinstance(stop, str):
stop = time() - parse_timedelta(stop)
if stop is not None:
stop = bisect(stop, 0, len(self.buffer))
if count is not None:
if start is None and stop is None:
stop = len(self.buffer)
start = stop - count
elif start is None and stop is not None:
start = stop - count
elif start is not None and stop is None:
stop = start + count
def set_tcp_timeout(comm):
"""
Set kernel-level TCP timeout on the stream.
"""
if comm.closed():
return
timeout = dask.config.get("distributed.comm.timeouts.tcp")
timeout = int(parse_timedelta(timeout, default="seconds"))
sock = comm.socket
# Default (unsettable) value on Windows
# https://msdn.microsoft.com/en-us/library/windows/desktop/dd877220(v=vs.85).aspx
nprobes = 10
assert timeout >= nprobes + 1, "Timeout too low"
idle = max(2, timeout // 4)
interval = max(1, (timeout - idle) // nprobes)
idle = timeout - interval * nprobes
assert idle > 0
try:
if sys.platform.startswith("win"):
logger.debug("Setting TCP keepalive: idle=%d, interval=%d", idle, interval)
timeout : number or string or timedelta, optional
Seconds to wait on the event in the scheduler. This does not
include local coroutine time, network transfer time, etc..
Instead of number of seconds, it is also possible to specify
a timedelta in string format, e.g. "200ms".
Examples
--------
>>> event = Event('a') # doctest: +SKIP
>>> event.wait(timeout="1s") # doctest: +SKIP
Returns
-------
True if the event was set of false, if a timeout happend
"""
timeout = parse_timedelta(timeout)
result = self.client.sync(
self.client.scheduler.event_wait, name=self.name, timeout=timeout,
)
return result
def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda: False):
interval = parse_timedelta(interval)
cycle = parse_timedelta(cycle)
recent = create()
last = time()
while not stop():
if time() > last + cycle:
log.append((time(), recent))
recent = create()
last = time()
try:
frame = sys._current_frames()[thread_id]
except KeyError:
return
process(frame, None, recent, omit=omit)
sleep(interval)
self._memory_monitoring = False
pc = PeriodicCallback(
self.memory_monitor, self.memory_monitor_interval * 1000,
)
self.periodic_callbacks["memory"] = pc
if extensions is None:
extensions = DEFAULT_EXTENSIONS
for ext in extensions:
ext(self)
self._throttled_gc = ThrottledGC(logger=logger)
setproctitle("dask-worker [not started]")
profile_trigger_interval = parse_timedelta(
dask.config.get("distributed.worker.profile.interval"), default="ms"
)
pc = PeriodicCallback(self.trigger_profile, profile_trigger_interval * 1000)
self.periodic_callbacks["profile"] = pc
pc = PeriodicCallback(self.cycle_profile, profile_cycle_interval * 1000)
self.periodic_callbacks["profile-cycle"] = pc
self.plugins = {}
self._pending_plugins = plugins
self.lifetime = lifetime or dask.config.get(
"distributed.worker.lifetime.duration"
)
lifetime_stagger = lifetime_stagger or dask.config.get(
"distributed.worker.lifetime.stagger"
def __init__(self, keys, scheduler=None, interval="100ms", complete=True):
self.scheduler = get_scheduler(scheduler)
self.client = None
for key in keys:
if hasattr(key, "client"):
self.client = weakref.ref(key.client)
break
self.keys = {k.key if hasattr(k, "key") else k for k in keys}
self.interval = parse_timedelta(interval, default="s")
self.complete = complete
self._start_time = default_timer()
class RPCClosed(IOError):
pass
logger = logging.getLogger(__name__)
def raise_later(exc):
def _raise(*args, **kwargs):
raise exc
return _raise
tick_maximum_delay = parse_timedelta(
dask.config.get("distributed.admin.tick.limit"), default="ms"
)
LOG_PDB = dask.config.get("distributed.admin.pdb-on-err")
class Server:
""" Dask Distributed Server
Superclass for endpoints in a distributed cluster, such as Worker
and Scheduler objects.
**Handlers**
Servers define operations with a ``handlers`` dict mapping operation names
to functions. The first argument of a handler function will be a ``Comm``
"semaphore_refresh_leases": self.refresh_leases,
"semaphore_value": self.get_value,
}
)
self.scheduler.extensions["semaphores"] = self
# {metric_name: {semaphore_name: metric}}
self.metrics = {
"acquire_total": defaultdict(int), # counter
"release_total": defaultdict(int), # counter
"average_pending_lease_time": defaultdict(float), # gauge
"pending": defaultdict(int), # gauge
}
validation_callback_time = parse_timedelta(
dask.config.get("distributed.scheduler.locks.lease-validation-interval"),
default="s",
)
self._pc_lease_timeout = PeriodicCallback(
self._check_lease_timeout, validation_callback_time * 1000
)
self._pc_lease_timeout.start()
self.lease_timeout = parse_timedelta(
dask.config.get("distributed.scheduler.locks.lease-timeout"), default="s"
)
def get(self, timeout=None, **kwargs):
""" Get the value of this variable
Parameters
----------
timeout: number or string or timedelta, optional
Time in seconds to wait before timing out.
Instead of number of seconds, it is also possible to specify
a timedelta in string format, e.g. "200ms".
"""
timeout = parse_timedelta(timeout)
return self.client.sync(self._get, timeout=timeout, **kwargs)
def __init__(self, scheduler):
self.scheduler = scheduler
# { level: { task states } }
self.stealable_all = [set() for i in range(15)]
# { worker: { level: { task states } } }
self.stealable = dict()
# { task state: (worker, level) }
self.key_stealable = dict()
self.cost_multipliers = [1 + 2 ** (i - 6) for i in range(15)]
self.cost_multipliers[0] = 1
for worker in scheduler.workers:
self.add_worker(worker=worker)
callback_time = parse_timedelta(
dask.config.get("distributed.scheduler.work-stealing-interval"),
default="ms",
)
# `callback_time` is in milliseconds
pc = PeriodicCallback(callback=self.balance, callback_time=callback_time * 1000)
self._pc = pc
self.scheduler.periodic_callbacks["stealing"] = pc
self.scheduler.plugins.append(self)
self.scheduler.extensions["stealing"] = self
self.scheduler.events["stealing"] = deque(maxlen=100000)
self.count = 0
# { task state: }
self.in_flight = dict()
# { worker state: occupancy }
self.in_flight_occupancy = defaultdict(lambda: 0)