Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
""" Dump an object to bytes, warn if those bytes are large """
b = dumps(obj)
if not _warn_dumps_warned[0] and len(b) > limit:
_warn_dumps_warned[0] = True
s = str(obj)
if len(s) > 70:
s = s[:50] + " ... " + s[-15:]
warnings.warn(
"Large object of size %s detected in task graph: \n"
" %s\n"
"Consider scattering large objects ahead of time\n"
"with client.scatter to reduce scheduler burden and \n"
"keep data on workers\n\n"
" future = client.submit(func, big_data) # bad\n\n"
" big_future = client.scatter(big_data) # good\n"
" future = client.submit(func, big_future) # good"
def _run(self, function, *args, **kwargs):
nanny = kwargs.pop('nanny', False)
workers = kwargs.pop('workers', None)
responses = yield self.scheduler.broadcast(msg=dict(op='run',
function=dumps(function),
args=dumps(args),
kwargs=dumps(kwargs)),
workers=workers, nanny=nanny)
results = {}
for key, resp in responses.items():
if resp['status'] == 'OK':
results[key] = resp['result']
elif resp['status'] == 'error':
six.reraise(*clean_exception(**resp))
raise gen.Return(results)
def dumps(x, *, buffer_callback=None):
""" Manage between cloudpickle and pickle
1. Try pickle
2. If it is short then check if it contains __main__
3. If it is long, then first check type, then check __main__
"""
buffers = []
dump_kwargs = {"protocol": HIGHEST_PROTOCOL}
if HIGHEST_PROTOCOL >= 5 and buffer_callback is not None:
dump_kwargs["buffer_callback"] = buffers.append
try:
buffers.clear()
result = pickle.dumps(x, **dump_kwargs)
if len(result) < 1000:
if b"__main__" in result:
buffers.clear()
result = cloudpickle.dumps(x, **dump_kwargs)
elif not _always_use_pickle_for(x) and b"__main__" in result:
buffers.clear()
result = cloudpickle.dumps(x, **dump_kwargs)
except Exception:
try:
buffers.clear()
result = cloudpickle.dumps(x, **dump_kwargs)
except Exception as e:
logger.info("Failed to serialize %s. Exception: %s", x, e)
raise
if buffer_callback is not None:
for b in buffers:
def _run_on_scheduler(self, function, *args, **kwargs):
response = yield self.scheduler.run_function(function=dumps(function),
args=dumps(args),
kwargs=dumps(kwargs))
if response['status'] == 'error':
six.reraise(*clean_exception(**response))
else:
raise gen.Return(response['result'])
Notes
-----
``Adaptive.workers_to_close`` dispatches to Scheduler.workers_to_close(),
but may be overridden in subclasses.
Returns
-------
List of worker addresses to close, if any
See Also
--------
Scheduler.workers_to_close
"""
return await self.scheduler.workers_to_close(
target=target,
key=pickle.dumps(self.worker_key) if self.worker_key else None,
attribute="name",
**self._workers_to_close_kwargs
)
def _run_coroutine(self, function, *args, **kwargs):
workers = kwargs.pop('workers', None)
wait = kwargs.pop('wait', True)
responses = yield self.scheduler.broadcast(msg=dict(op='run_coroutine',
function=dumps(function),
args=dumps(args),
kwargs=dumps(kwargs),
wait=wait),
workers=workers)
if not wait:
raise gen.Return(None)
else:
results = {}
for key, resp in responses.items():
if resp['status'] == 'OK':
results[key] = resp['result']
elif resp['status'] == 'error':
six.reraise(*clean_exception(**resp))
raise gen.Return(results)
def cuda_dumps(x):
type_name = typename(type(x))
try:
dumps = cuda_serialize.dispatch(type(x))
except TypeError:
raise NotImplementedError(type_name)
header, frames = dumps(x)
header["type-serialized"] = pickle.dumps(type(x))
header["serializer"] = "cuda"
header["compression"] = (False,) * len(frames) # no compression for gpu data
return header, frames
def _run(self, function, *args, **kwargs):
nanny = kwargs.pop('nanny', False)
workers = kwargs.pop('workers', None)
responses = yield self.scheduler.broadcast(msg=dict(op='run',
function=dumps(function),
args=dumps(args),
kwargs=dumps(kwargs)),
workers=workers, nanny=nanny)
results = {}
for key, resp in responses.items():
if resp['status'] == 'OK':
results[key] = resp['result']
elif resp['status'] == 'error':
six.reraise(*clean_exception(**resp))
raise gen.Return(results)
nbytes = self.nbytes.get(key)
typ = self.types.get(key)
if nbytes is None or typ is None:
try:
value = self.data[key]
except KeyError:
value = self.actors[key]
nbytes = self.nbytes[key] = sizeof(value)
typ = self.types[key] = type(value)
del value
try:
typ_serialized = dumps_function(typ)
except PicklingError:
# Some types fail pickling (example: _thread.lock objects),
# send their name as a best effort.
typ_serialized = pickle.dumps(typ.__name__)
d = {
"op": "task-finished",
"status": "OK",
"key": key,
"nbytes": nbytes,
"thread": self.threads.get(key),
"type": typ_serialized,
"typename": typename(typ),
}
elif key in self.exceptions:
d = {
"op": "task-erred",
"status": "error",
"key": key,
"thread": self.threads.get(key),
"exception": self.exceptions[key],
def _run_on_scheduler(self, function, *args, **kwargs):
response = yield self.scheduler.run_function(function=dumps(function),
args=dumps(args),
kwargs=dumps(kwargs))
if response['status'] == 'error':
six.reraise(*clean_exception(**response))
else:
raise gen.Return(response['result'])