Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_actor_with_callable_mock_property_does_not_work(
actor_class, stop_all, mocker
):
mock = mocker.Mock()
mock.__get__ = mocker.Mock(return_value='mocked property value')
assert isinstance(mock, _compat.Callable)
actor_class.a_rw_property = mock
proxy = actor_class.start().proxy()
# XXX Because Mock and MagicMock are callable by default, they cause the
# property to be wrapped in a `CallableProxy`. Thus, the property no
# longer behaves as a property when mocked and accessed through a proxy.
with pytest.raises(AttributeError) as exc_info:
assert proxy.a_rw_property.get()
assert "'CallableProxy' object has no attribute 'get'" in str(
exc_info.value
)
def test_actor_with_noncallable_mock_property_works(
actor_class, stop_all, mocker
):
mock = mocker.NonCallableMock()
mock.__get__ = mocker.Mock(return_value='mocked property value')
assert not isinstance(mock, _compat.Callable)
actor_class.a_rw_property = mock
proxy = actor_class.start().proxy()
# When using NonCallableMock to fake the property, the value still behaves
# as a property when access through the proxy.
assert proxy.a_rw_property.get() == 'mocked property value'
assert mock.__get__.call_count == 1
def get_value():
return _compat.await_keyword(future)
def get(self, timeout=None):
try:
return super(ThreadingFuture, self).get(timeout=timeout)
except NotImplementedError:
pass
try:
if self._data is None:
self._data = self._queue.get(True, timeout)
if 'exc_info' in self._data:
_compat.reraise(*self._data['exc_info'])
else:
return self._data['value']
except _compat.queue.Empty:
raise Timeout('{} seconds'.format(timeout))
def broadcast(cls, message, target_class=None):
"""
Broadcast ``message`` to all actors of the specified ``target_class``.
If no ``target_class`` is specified, the message is broadcasted to all
actors.
:param message: the message to send
:type message: any
:param target_class: optional actor class to broadcast the message to
:type target_class: class or class name
"""
if isinstance(target_class, _compat.string_types):
targets = cls.get_by_class_name(target_class)
elif target_class is not None:
targets = cls.get_by_class(target_class)
else:
targets = cls.get_all()
for ref in targets:
ref.tell(message)
def _is_callable_attribute(self, attr):
"""Returns true for any attribute that is callable."""
return isinstance(attr, _compat.Callable)
def get(self, timeout=None):
try:
return super(ThreadingFuture, self).get(timeout=timeout)
except NotImplementedError:
pass
try:
if self._data is None:
self._data = self._queue.get(True, timeout)
if 'exc_info' in self._data:
_compat.reraise(*self._data['exc_info'])
else:
return self._data['value']
except _compat.queue.Empty:
raise Timeout('{} seconds'.format(timeout))
>>> f = pykka.ThreadingFuture()
>>> g = f.reduce(lambda x, y: x + y, 5)
>>> f.set([])
>>> g.get()
5
.. versionadded:: 1.2
"""
future = self.__class__()
future.set_get_hook(
lambda timeout: functools.reduce(func, self.get(timeout), *args)
)
return future
__await__ = _compat.await_dunder_future
__iter__ = __await__
def get_all(futures, timeout=None):
"""
Collect all values encapsulated in the list of futures.
If ``timeout`` is not :class:`None`, the method will wait for a reply for
``timeout`` seconds, and then raise :exc:`pykka.Timeout`.
:param futures: futures for the results to collect
:type futures: list of :class:`pykka.Future`
:param timeout: seconds to wait before timeout
:type timeout: float or :class:`None`