Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def on_start(self):
self.events.on_start_was_called.set()
if ActorRegistry.get_by_urn(self.actor_urn) is not None:
self.events.actor_registered_before_on_start_was_called.set()
def test_get_by_urn_returns_none_if_not_found():
result = ActorRegistry.get_by_urn('urn:foo:bar')
assert result is None
def test_actors_may_be_looked_up_by_urn(actor_ref):
result = ActorRegistry.get_by_urn(actor_ref.actor_urn)
assert result == actor_ref
def reply(self, message, text, opts=None):
"""
Reply to the sender of the provided message with a message \
containing the provided text.
:param message: the message to reply to
:param text: the text to reply with
:param opts: A dictionary of additional values to add to metadata
:return: None
"""
metadata = Metadata(source=self.actor_urn,
dest=message['metadata']['source']).__dict__
metadata['opts'] = opts
message = Message(text=text, metadata=metadata,
should_log=message['should_log']).__dict__
dest_actor = ActorRegistry.get_by_urn(message['metadata']['dest'])
if dest_actor is not None:
dest_actor.tell(message)
else:
raise("Tried to send message to nonexistent actor")