Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ProxyCall(
attr_path=['nested', 'method'], args=[1], kwargs={'a': 'b'}
),
),
(
{'command': 'pykka_getattr', 'attr_path': ['nested', 'attr']},
ProxyGetAttr(attr_path=['nested', 'attr']),
),
(
{
'command': 'pykka_setattr',
'attr_path': ['nested', 'attr'],
'value': 'abcdef',
},
ProxySetAttr(attr_path=['nested', 'attr'], value='abcdef'),
),
],
def _handle_receive(self, message):
"""Handles messages sent to the actor."""
message = messages._upgrade_internal_message(message)
if isinstance(message, messages._ActorStop):
return self._stop()
if isinstance(message, messages.ProxyCall):
callee = self._get_attribute_from_path(message.attr_path)
return callee(*message.args, **message.kwargs)
if isinstance(message, messages.ProxyGetAttr):
attr = self._get_attribute_from_path(message.attr_path)
return attr
if isinstance(message, messages.ProxySetAttr):
parent_attr = self._get_attribute_from_path(message.attr_path[:-1])
attr_name = message.attr_path[-1]
return setattr(parent_attr, attr_name, message.value)
return self.on_receive(message)
def defer(self, *args, **kwargs):
"""Call with :meth:`~pykka.ActorRef.tell` semantics.
Does not create or return a future.
If the call raises an exception, there is no future to set the
exception on. Thus, the actor's :meth:`~pykka.Actor.on_failure` hook
is called instead.
.. versionadded:: 2.0
"""
message = messages.ProxyCall(
attr_path=self._attr_path, args=args, kwargs=kwargs
)
return self.actor_ref.tell(message)
def __call__(self, *args, **kwargs):
"""Call with :meth:`~pykka.ActorRef.ask` semantics.
Returns a future which will yield the called method's return value.
If the call raises an exception is set on the future, and will be
reraised by :meth:`~pykka.Future.get`. If the future is left unused,
the exception will not be reraised. Either way, the exception will
also be logged. See :ref:`logging` for details.
"""
message = messages.ProxyCall(
attr_path=self._attr_path, args=args, kwargs=kwargs
)
return self.actor_ref.ask(message, block=False)
def _on_about_to_finish_callback(self):
"""Callback that performs a blocking actor call to the real callback.
This is passed to audio, which is allowed to call this code from the
audio thread. We pass execution into the core actor to ensure that
there is no unsafe access of state in core. This must block until
we get a response.
"""
self.core.actor_ref.ask(
ProxyCall(
attr_path=["playback", "_on_about_to_finish"],
args=[],
kwargs={},
)