Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.memory_target_fraction or self.memory_spill_fraction
):
try:
from zict import Buffer, File, Func
except ImportError:
raise ImportError(
"Please `python -m pip install zict` for spill-to-disk workers"
)
path = os.path.join(self.local_directory, "storage")
storage = Func(
partial(serialize_bytelist, on_error="raise"),
deserialize_bytes,
File(path),
)
target = int(float(self.memory_limit) * self.memory_target_fraction)
self.data = Buffer({}, storage, target, weight)
self.data.memory = self.data.fast
self.data.disk = self.data.slow
else:
self.data = dict()
self.actors = {}
self.loop = loop or IOLoop.current()
self.reconnect = reconnect
self.executor = executor or ThreadPoolExecutor(
self.nthreads, thread_name_prefix="Dask-Worker-Threads'"
)
self.actor_executor = ThreadPoolExecutor(
1, thread_name_prefix="Dask-Actor-Threads"
)
self.batched_stream = BatchedSend(interval="2ms", loop=self.loop)
self.name = name
def __init__(self, worker, target_memory, storage_path):
self.worker = worker
# Build exact chain that dask already provides,
# Fill local memory to target then spill to disk
file = zict.File(storage_path)
self.slow = zict.Func(partial(protocol.serialize_bytelist, on_error='raise'), protocol.deserialize_bytes, file)
self.fast = LocalStore()
self.l1 = zict.Buffer(self.fast, self.slow, target_memory, lambda x, y: safe_sizeof(y))
logger.info(f'Setting fast store memory limit {target_memory} bytes and disk spill path {storage_path}')
self.l2 = redis.client.Redis(REDIS_HOST, int(REDIS_PORT), int(REDIS_DB))
self.disk_func = Func(
functools.partial(serialize_bytelist, on_error="raise"),
deserialize_bytes,
File(self.disk_func_path),
)
if memory_limit == 0:
self.host_buffer = self.host_func
else:
self.host_buffer = Buffer(
self.host_func, self.disk_func, memory_limit, weight=weight
)
self.device_keys = set()
self.device_func = dict()
self.device_host_func = Func(device_to_host, host_to_device, self.host_buffer)
self.device_buffer = Buffer(
self.device_func, self.device_host_func, device_memory_limit, weight=weight
)
self.device = self.device_buffer.fast.d
self.host = self.host_buffer if memory_limit == 0 else self.host_buffer.fast.d
self.disk = None if memory_limit == 0 else self.host_buffer.slow.d
# For Worker compatibility only, where `fast` is host memory buffer
self.fast = self.host_buffer if memory_limit == 0 else self.host_buffer.fast
def setup(self, worker):
self.cache = Buffer(
fast={},
slow=Func(
dump=blosc.pack_array,
load=blosc.unpack_array,
d=Buffer(
fast={},
slow=LRU(
n=self._maxdisk,
d=File(os.path.join(worker.local_directory, 'cache')),
weight=lambda k, v: len(v),
),
n=self._maxcompressed,
weight=lambda k, v: len(v),
),
),
n=self._maxmem,
if not os.path.exists(local_directory):
os.makedirs(local_directory, exist_ok=True)
local_directory = os.path.join(local_directory, "dask-worker-space")
self.disk_func_path = os.path.join(local_directory, "storage")
self.host_func = dict()
self.disk_func = Func(
functools.partial(serialize_bytelist, on_error="raise"),
deserialize_bytes,
File(self.disk_func_path),
)
if memory_limit == 0:
self.host_buffer = self.host_func
else:
self.host_buffer = Buffer(
self.host_func, self.disk_func, memory_limit, weight=weight
)
self.device_keys = set()
self.device_func = dict()
self.device_host_func = Func(device_to_host, host_to_device, self.host_buffer)
self.device_buffer = Buffer(
self.device_func, self.device_host_func, device_memory_limit, weight=weight
)
self.device = self.device_buffer.fast.d
self.host = self.host_buffer if memory_limit == 0 else self.host_buffer.fast.d
self.disk = None if memory_limit == 0 else self.host_buffer.slow.d
# For Worker compatibility only, where `fast` is host memory buffer
self.fast = self.host_buffer if memory_limit == 0 else self.host_buffer.fast
def setup(self, worker):
self.cache = Buffer(
fast={},
slow=Func(
dump=blosc.pack_array,
load=blosc.unpack_array,
d=Buffer(
fast={},
slow=LRU(
n=self._maxdisk,
d=File(os.path.join(worker.local_directory, 'cache')),
weight=lambda k, v: len(v),
),
n=self._maxcompressed,
weight=lambda k, v: len(v),
),
),
n=self._maxmem,
weight=lambda k, v: v.nbytes,
)
self.lock = Lock()
self.hits = 0
self.misses = 0
import os
import sys
from .common import ZictBase
def _encode_key(key):
return key.encode('utf-8')
def _decode_key(key):
return key.decode('utf-8')
class LMDB(ZictBase):
""" Mutable Mapping interface to a LMDB database.
Keys must be strings, values must be bytes
Parameters
----------
directory: string
Examples
--------
>>> z = LMDB('/tmp/somedir/') # doctest: +SKIP
>>> z['x'] = b'123' # doctest: +SKIP
>>> z['x'] # doctest: +SKIP
b'123'
"""
def __init__(self, directory):
from __future__ import absolute_import, division, print_function
from heapdict import heapdict
from .common import ZictBase, close
def do_nothing(k, v):
pass
class LRU(ZictBase):
""" Evict Least Recently Used Elements
Parameters
----------
n: int
Number of elements to keep, or total weight if weight= is used
d: MutableMapping
Dictionary in which to hold elements
on_evict: list of callables
Function:: k, v -> action to call on key value pairs prior to eviction
weight: callable
Function:: k, v -> number to determine the size of keeping the item in
the mapping. Defaults to ``(k, v) -> 1``
Examples
--------
def _safe_key(key):
"""
Escape key so as to be usable on all filesystems.
"""
# Even directory separators are unsafe.
return quote(key, safe="")
def _unsafe_key(key):
"""
Undo the escaping done by _safe_key().
"""
return unquote(key)
class File(ZictBase):
""" Mutable Mapping interface to a directory
Keys must be strings, values must be bytes
Note this shouldn't be used for interprocess persistence, as keys
are cached in memory.
Parameters
----------
directory: string
mode: string, ('r', 'w', 'a'), defaults to 'a'
Examples
--------
>>> z = File('myfile') # doctest: +SKIP
>>> z['x'] = b'123' # doctest: +SKIP
from itertools import chain
from .common import ZictBase, close
from .lru import LRU
class Buffer(ZictBase):
""" Buffer one dictionary on top of another
This creates a MutableMapping by combining two MutableMappings, one that
feeds into the other when it overflows, based on an LRU mechanism. When
the first evicts elements these get placed into the second. When an item
is retrieved from the second it is placed back into the first.
Parameters
----------
fast: MutableMapping
slow: MutableMapping
fast_to_slow_callbacks: list of callables
These functions run every time data moves from the fast to the slow
mapping. They take two arguments, a key and a value
slow_to_fast_callbacks: list of callables
These functions run every time data moves form the slow to the fast