Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_actor_may_be_unregistered_multiple_times_without_error(actor_ref):
ActorRegistry.unregister(actor_ref)
assert actor_ref not in ActorRegistry.get_all()
ActorRegistry.unregister(actor_ref)
assert actor_ref not in ActorRegistry.get_all()
ActorRegistry.register(actor_ref)
assert actor_ref in ActorRegistry.get_all()
def test_actor_may_be_registered_manually(actor_ref):
ActorRegistry.unregister(actor_ref)
assert actor_ref not in ActorRegistry.get_all()
ActorRegistry.register(actor_ref)
assert actor_ref in ActorRegistry.get_all()
def _handle_failure(self, exception_type, exception_value, traceback):
"""Logs unexpected failures, unregisters and stops the actor."""
logger.error(
'Unhandled exception in {}:'.format(self),
exc_info=(exception_type, exception_value, traceback),
)
ActorRegistry.unregister(self.actor_ref)
self.actor_stopped.set()
def stop(self):
if self.bookmarks:
logger.debug('Stopping %s', self.bookmarks)
try:
self.bookmarks.stop(timeout=1)
except pykka.Timeout:
# bookmarks actor may be waiting on backend
pykka.ActorRegistry.unregister(self.bookmarks)