Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_broadcast_sends_message_to_all_actors_of_given_class_name(
actor_a_class, actor_b_class
):
ActorRegistry.broadcast({'command': 'foo'}, target_class='ActorA')
for actor_ref in ActorRegistry.get_by_class(actor_a_class):
received_messages = actor_ref.proxy().received_messages.get()
assert {'command': 'foo'} in received_messages
for actor_ref in ActorRegistry.get_by_class(actor_b_class):
received_messages = actor_ref.proxy().received_messages.get()
assert {'command': 'foo'} not in received_messages
def test_broadcast_sends_message_to_all_actors_of_given_class(
actor_a_class, actor_b_class
):
ActorRegistry.broadcast({'command': 'foo'}, target_class=actor_a_class)
for actor_ref in ActorRegistry.get_by_class(actor_a_class):
received_messages = actor_ref.proxy().received_messages.get()
assert {'command': 'foo'} in received_messages
for actor_ref in ActorRegistry.get_by_class(actor_b_class):
received_messages = actor_ref.proxy().received_messages.get()
assert {'command': 'foo'} not in received_messages
def test_broadcast_sends_message_to_all_actors_if_no_target(
a_actor_refs, b_actor_refs
):
ActorRegistry.broadcast({'command': 'foo'})
running_actors = ActorRegistry.get_all()
assert running_actors
for actor_ref in running_actors:
received_messages = actor_ref.proxy().received_messages.get()
assert {'command': 'foo'} in received_messages
def tell_tick():
ActorRegistry.broadcast({'msg': 'tick'})