Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_actors_may_be_looked_up_by_class(
actor_a_class, a_actor_refs, b_actor_refs
):
result = ActorRegistry.get_by_class(actor_a_class)
for a_actor in a_actor_refs:
assert a_actor in result
for b_actor in b_actor_refs:
assert b_actor not in result
def test_broadcast_sends_message_to_all_actors_of_given_class_name(
actor_a_class, actor_b_class
):
ActorRegistry.broadcast({'command': 'foo'}, target_class='ActorA')
for actor_ref in ActorRegistry.get_by_class(actor_a_class):
received_messages = actor_ref.proxy().received_messages.get()
assert {'command': 'foo'} in received_messages
for actor_ref in ActorRegistry.get_by_class(actor_b_class):
received_messages = actor_ref.proxy().received_messages.get()
assert {'command': 'foo'} not in received_messages
def test_broadcast_sends_message_to_all_actors_of_given_class_name(
actor_a_class, actor_b_class
):
ActorRegistry.broadcast({'command': 'foo'}, target_class='ActorA')
for actor_ref in ActorRegistry.get_by_class(actor_a_class):
received_messages = actor_ref.proxy().received_messages.get()
assert {'command': 'foo'} in received_messages
for actor_ref in ActorRegistry.get_by_class(actor_b_class):
received_messages = actor_ref.proxy().received_messages.get()
assert {'command': 'foo'} not in received_messages
def send(cls, event, **kwargs):
listeners = pykka.ActorRegistry.get_by_class(cls)
logger.debug('Sending %s to %s: %s', event, cls.__name__, kwargs)
for listener in listeners:
# Save time by calling methods on Pykka actor without creating a
# throwaway actor proxy.
#
# Because we use `.tell()` there is no return channel for any errors,
# so Pykka logs them immediately. The alternative would be to use
# `.ask()` and `.get()` the returned futures to block for the listeners
# to react and return their exceptions to us. Since emitting events in
# practise is making calls upwards in the stack, blocking here would
# quickly deadlock.
listener.tell({
'command': 'pykka_call',
'attr_path': ('on_event',),
'args': (event,),
'kwargs': kwargs,
def number_of_connections(self):
return len(pykka.ActorRegistry.get_by_class(self.protocol))
def stop_actors_by_class(klass):
actors = pykka.ActorRegistry.get_by_class(klass)
logger.debug('Stopping %d instance(s) of %s', len(actors), klass.__name__)
for actor in actors:
actor.stop()