Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_base_future_set_is_not_implemented():
future = Future()
with pytest.raises(NotImplementedError):
future.set(None)
def test_base_future_set_exception_is_not_implemented():
future = Future()
with pytest.raises(NotImplementedError):
future.set_exception(None)
def test_base_future_get_is_not_implemented():
future = Future()
with pytest.raises(NotImplementedError):
future.get()
wait_timeout = eventlet.Timeout(timeout)
try:
with wait_timeout:
super(EventletEvent, self).wait()
except eventlet.Timeout as t:
if t is not wait_timeout:
raise
return False
else:
self.event.wait()
return True
class EventletFuture(Future):
"""
:class:`EventletFuture` implements :class:`pykka.Future` for use with
:class:`EventletActor`.
"""
event = None
def __init__(self):
super(EventletFuture, self).__init__()
self.event = eventlet.event.Event()
def get(self, timeout=None):
try:
return super(EventletFuture, self).get(timeout=timeout)
except NotImplementedError:
pass
from __future__ import absolute_import
import sys
import gevent
import gevent.event
import gevent.queue
from pykka import Actor, Future, Timeout
__all__ = ['GeventActor', 'GeventFuture']
class GeventFuture(Future):
"""
:class:`GeventFuture` implements :class:`pykka.Future` for use with
:class:`GeventActor`.
It encapsulates a :class:`gevent.event.AsyncResult` object which may be
used directly, though it will couple your code with gevent.
"""
#: The encapsulated :class:`gevent.event.AsyncResult`
async_result = None
def __init__(self, async_result=None):
super(GeventFuture, self).__init__()
if async_result is None:
async_result = gevent.event.AsyncResult()
self.async_result = async_result
from __future__ import absolute_import
import sys
import threading
from pykka import Actor, Future, Timeout, _compat
__all__ = ['ThreadingActor', 'ThreadingFuture']
class ThreadingFuture(Future):
"""
:class:`ThreadingFuture` implements :class:`Future` for use with
:class:`ThreadingActor `.
The future is implemented using a :class:`queue.Queue`.
The future does *not* make a copy of the object which is :meth:`set()
` on it. It is the setters responsibility to only pass
immutable objects or make a copy of the object before setting it on the
future.
.. versionchanged:: 0.14
Previously, the encapsulated value was a copy made with
:func:`copy.deepcopy`, unless the encapsulated value was a future, in
which case the original future was encapsulated.
"""
def _unwrap_result(self, result):
if isinstance(result, pykka.Future):
result = result.get()
return result