How to use the kopf.reactor.causation.Activity function in kopf

To help you get started, we’ve selected a few kopf examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github zalando-incubator / kopf / tests / registries / test_decorators.py View on Github external
def test_on_probe_with_all_kwargs(mocker):
    registry = OperatorRegistry()

    @kopf.on.probe(
        id='id', registry=registry,
        errors=ErrorsMode.PERMANENT, timeout=123, retries=456, backoff=78)
    def fn(**_):
        pass

    handlers = registry.get_activity_handlers(activity=Activity.PROBE)
    assert len(handlers) == 1
    assert handlers[0].fn is fn
    assert handlers[0].activity == Activity.PROBE
    assert handlers[0].id == 'id'
    assert handlers[0].errors == ErrorsMode.PERMANENT
    assert handlers[0].timeout == 123
    assert handlers[0].retries == 456
    assert handlers[0].backoff == 78
    assert handlers[0].cooldown == 78  # deprecated alias
github zalando-incubator / kopf / tests / registries / test_registering.py View on Github external
@pytest.mark.parametrize('activity', list(Activity))
def test_operator_registry_with_activity_with_minimal_signature(
        operator_registry_cls, activity):

    registry = operator_registry_cls()
    registry.register_activity_handler(some_fn)
    handlers = registry.get_activity_handlers(activity=activity)

    assert len(handlers) == 1
    assert handlers[0].fn is some_fn
github zalando-incubator / kopf / tests / test_liveness.py View on Github external
async def test_liveness_with_reporting(liveness_url, liveness_registry):

    def fn1(**kwargs):
        return {'x': 100}

    def fn2(**kwargs):
        return {'y': '200'}

    liveness_registry.register_activity_handler(fn=fn1, id='id1', activity=Activity.PROBE)
    liveness_registry.register_activity_handler(fn=fn2, id='id2', activity=Activity.PROBE)

    async with aiohttp.ClientSession() as session:
        async with session.get(liveness_url) as response:
            data = await response.json()
            assert isinstance(data, dict)
            assert data == {'id1': {'x': 100}, 'id2': {'y': '200'}}
github zalando-incubator / kopf / tests / test_liveness.py View on Github external
async def test_liveness_data_is_cached(liveness_url, liveness_registry):
    counter = 0

    def fn1(**kwargs):
        nonlocal counter
        counter += 1
        return {'counter': counter}

    liveness_registry.register_activity_handler(fn=fn1, id='id1', activity=Activity.PROBE)

    async with aiohttp.ClientSession() as session:
        async with session.get(liveness_url) as response:
            data = await response.json()
            assert isinstance(data, dict)
            assert data == {'id1': {'counter': 1}}
        async with session.get(liveness_url) as response:
            data = await response.json()
            assert isinstance(data, dict)
            assert data == {'id1': {'counter': 1}}  # not 2!
github zalando-incubator / kopf / tests / registries / test_registering.py View on Github external
@pytest.mark.parametrize('activity', list(Activity))
def test_operator_registry_with_activity_via_iter(
        operator_registry_cls, activity):

    registry = operator_registry_cls()
    iterator = registry.iter_activity_handlers(activity=activity)

    assert isinstance(iterator, collections.abc.Iterator)
    assert not isinstance(iterator, collections.abc.Collection)
    assert not isinstance(iterator, collections.abc.Container)
    assert not isinstance(iterator, (list, tuple))

    handlers = list(iterator)
    assert not handlers
github zalando-incubator / kopf / kopf / reactor / running.py View on Github external
# Wait for all other root tasks to exit before cleaning up.
    # Beware: on explicit operator cancellation, there is no graceful period at all.
    try:
        current_task = asyncio.current_task()
        awaited_tasks = {task for task in root_tasks if task is not current_task}
        await _wait(awaited_tasks)
    except asyncio.CancelledError:
        logger.warning("Cleanup activity is not executed at all due to cancellation.")
        raise

    # Execute the cleanup activity after all other root tasks are presumably done.
    try:
        await handling.activity_trigger(
            lifecycle=lifecycles.all_at_once,
            registry=registry,
            activity=causation.Activity.CLEANUP,
        )
        await vault.close()
    except asyncio.CancelledError:
        logger.warning("Cleanup activity is only partially executed due to cancellation.")
        raise
github zalando-incubator / kopf / kopf / on.py View on Github external
def decorator(fn: registries.ActivityHandlerFn) -> registries.ActivityHandlerFn:
        return actual_registry.register_activity_handler(
            fn=fn, id=id,
            errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
            activity=causation.Activity.STARTUP,
        )
github zalando-incubator / kopf / kopf / reactor / registries.py View on Github external
id='login_via_pykube',
                fn=cast(ActivityHandlerFn, piggybacking.login_via_pykube),
                activity=causation.Activity.AUTHENTICATION,
                errors=ErrorsMode.IGNORED,
                _fallback=True,
            )

        try:
            import kubernetes
        except ImportError:
            pass
        else:
            self.register_activity_handler(
                id='login_via_client',
                fn=cast(ActivityHandlerFn, piggybacking.login_via_client),
                activity=causation.Activity.AUTHENTICATION,
                errors=ErrorsMode.IGNORED,
                _fallback=True,
            )
github zalando-incubator / kopf / kopf / reactor / activities.py View on Github external
registry: registries.OperatorRegistry,
        vault: credentials.Vault,
        _activity_title: str = "Authentication",
) -> None:
    """ Retrieve the credentials once, successfully or not, and exit. """

    # Sleep most of the time waiting for a signal to re-auth.
    await vault.wait_for_emptiness()

    # Log initial and re-authentications differently, for readability.
    logger.info(f"{_activity_title} has been initiated.")

    activity_results = await handling.activity_trigger(
        lifecycle=lifecycles.all_at_once,
        registry=registry,
        activity=causation.Activity.AUTHENTICATION,
    )

    if activity_results:
        logger.info(f"{_activity_title} has finished.")
    else:
        logger.warning(f"{_activity_title} has failed: "
                       f"no credentials were retrieved from the login handlers.")

    # Feed the credentials into the vault, and unfreeze the re-authenticating clients.
    await vault.populate({str(handler_id): info for handler_id, info in activity_results.items()})
github zalando-incubator / kopf / kopf / reactor / registries.py View on Github external
warnings.warn("cooldown=... is deprecated, use backoff=...", DeprecationWarning)
            self.backoff = cooldown

    # @property cannot be used due to a data field definition with the same name.
    def __getattribute__(self, name: str) -> Any:
        if name == 'cooldown':
            warnings.warn("handler.cooldown is deprecated, use handler.backoff", DeprecationWarning)
            return self.backoff
        else:
            return super().__getattribute__(name)


@dataclasses.dataclass
class ActivityHandler(BaseHandler):
    fn: ActivityHandlerFn  # type clarification
    activity: Optional[causation.Activity] = None
    _fallback: bool = False  # non-public!


@dataclasses.dataclass
class ResourceHandler(BaseHandler):
    fn: ResourceHandlerFn  # type clarification
    reason: Optional[causation.Reason]
    field: Optional[dicts.FieldPath]
    initial: Optional[bool] = None
    deleted: Optional[bool] = None  # used for mixed-in (initial==True) @on.resume handlers only.
    labels: Optional[bodies.Labels] = None
    annotations: Optional[bodies.Annotations] = None
    requires_finalizer: Optional[bool] = None

    @property
    def event(self) -> Optional[causation.Reason]: