Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
This task spends most of its time in forever sleep, only running
in the beginning and in the end.
The root tasks do not actually start until the ready-flag is set,
which happens after the startup handlers finished successfully.
Beside calling the startup/cleanup handlers, it performs few operator-scoped
cleanups too (those that cannot be handled by garbage collection).
"""
# Execute the startup activity before any root task starts running (due to readiness flag).
try:
await handling.activity_trigger(
lifecycle=lifecycles.all_at_once,
registry=registry,
activity=causation.Activity.STARTUP,
)
except asyncio.CancelledError:
logger.warning("Startup activity is only partially executed due to cancellation.")
raise
# Notify the caller that we are ready to be executed. This unfreezes all the root tasks.
await primitives.raise_flag(ready_flag)
# Sleep forever, or until cancelled, which happens when the operator begins its shutdown.
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
pass
# Wait for all other root tasks to exit before cleaning up.
# Beware: on explicit operator cancellation, there is no graceful period at all.
def _create_watching_cause(
resource: resources_.Resource,
event: bodies.Event,
) -> causation.ResourceWatchingCause:
return causation.detect_resource_watching_cause(
resource=resource,
event=event,
patch=patches.Patch(), # unused
type=event['type'], # unused
body=event['object'], # unused
raw=event, # unused
)
class HandlerRetriesError(PermanentError):
""" An error for the handler's retries exceeded (if set). """
class HandlerChildrenRetry(TemporaryError):
""" An internal pseudo-error to retry for the next sub-handlers attempt. """
# The task-local context; propagated down the stack instead of multiple kwargs.
# Used in `@kopf.on.this` and `kopf.execute()` to add/get the sub-handlers.
sublifecycle_var: ContextVar[lifecycles.LifeCycleFn] = ContextVar('sublifecycle_var')
subregistry_var: ContextVar[registries.ResourceChangingRegistry] = ContextVar('subregistry_var')
subexecuted_var: ContextVar[bool] = ContextVar('subexecuted_var')
handler_var: ContextVar[registries.BaseHandler] = ContextVar('handler_var')
cause_var: ContextVar[causation.BaseCause] = ContextVar('cause_var')
async def activity_trigger(
*,
lifecycle: lifecycles.LifeCycleFn,
registry: registries.OperatorRegistry,
activity: causation.Activity,
) -> Mapping[registries.HandlerId, registries.HandlerResult]:
"""
Execute a handling cycle until succeeded or permanently failed.
This mimics the behaviour of patching-watching in Kubernetes, but in-memory.
"""
logger = logging.getLogger(f'kopf.activities.{activity.value}')
# For the activity handlers, we have neither bodies, nor patches, just the state.
def _guess_owner(
owner: Optional[bodies.Body],
) -> bodies.Body:
if owner is not None:
return owner
try:
cause = handling.cause_var.get()
except LookupError:
pass
else:
if cause is not None and isinstance(cause, causation.ResourceCause):
return cause.body
raise LookupError("Owner must be set explicitly, since running outside of a handler.")
def decorator(fn: registries.ResourceHandlerFn) -> registries.ResourceHandlerFn:
return actual_registry.register_resource_changing_handler(
group=group, version=version, plural=plural,
reason=causation.Reason.DELETE, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
fn=fn, requires_finalizer=bool(not optional),
labels=labels, annotations=annotations,
)
return self.backoff
else:
return super().__getattribute__(name)
@dataclasses.dataclass
class ActivityHandler(BaseHandler):
fn: ActivityHandlerFn # type clarification
activity: Optional[causation.Activity] = None
_fallback: bool = False # non-public!
@dataclasses.dataclass
class ResourceHandler(BaseHandler):
fn: ResourceHandlerFn # type clarification
reason: Optional[causation.Reason]
field: Optional[dicts.FieldPath]
initial: Optional[bool] = None
deleted: Optional[bool] = None # used for mixed-in (initial==True) @on.resume handlers only.
labels: Optional[bodies.Labels] = None
annotations: Optional[bodies.Annotations] = None
requires_finalizer: Optional[bool] = None
@property
def event(self) -> Optional[causation.Reason]:
warnings.warn("`handler.event` is deprecated; use `handler.reason`.", DeprecationWarning)
return self.reason
# We only type-check for known classes of handlers/callbacks, and ignore any custom subclasses.
HandlerFnT = TypeVar('HandlerFnT', ActivityHandlerFn, ResourceHandlerFn)
HandlerT = TypeVar('HandlerT', ActivityHandler, ResourceHandler)
def __init__(self) -> None:
super().__init__()
try:
import pykube
except ImportError:
pass
else:
self.register_activity_handler(
id='login_via_pykube',
fn=cast(ActivityHandlerFn, piggybacking.login_via_pykube),
activity=causation.Activity.AUTHENTICATION,
errors=ErrorsMode.IGNORED,
_fallback=True,
)
try:
import kubernetes
except ImportError:
pass
else:
self.register_activity_handler(
id='login_via_client',
fn=cast(ActivityHandlerFn, piggybacking.login_via_client),
activity=causation.Activity.AUTHENTICATION,
errors=ErrorsMode.IGNORED,
_fallback=True,
)
The synchronous methods are executed in the executor (threads or processes),
thus making it non-blocking for the main event loop of the operator.
See: https://pymotw.com/3/asyncio/executors.html
"""
# Add aliases for the kwargs, directly linked to the body, or to the assumed defaults.
if isinstance(cause, causation.BaseCause):
kwargs.update(
cause=cause,
logger=cause.logger,
)
if isinstance(cause, causation.ActivityCause):
kwargs.update(
activity=cause.activity,
)
if isinstance(cause, causation.ResourceCause):
kwargs.update(
patch=cause.patch,
memo=cause.memo,
body=cause.body,
spec=dicts.DictView(cause.body, 'spec'),
meta=dicts.DictView(cause.body, 'metadata'),
status=dicts.DictView(cause.body, 'status'),
uid=cause.body.get('metadata', {}).get('uid'),
name=cause.body.get('metadata', {}).get('name'),
namespace=cause.body.get('metadata', {}).get('namespace'),
)
if isinstance(cause, causation.ResourceWatchingCause):
kwargs.update(
event=cause.raw,
type=cause.type,
)
Specifically, calculate the handler-specific fields (e.g. field diffs).
Ensure the global context for this asyncio task is set to the handler and
its cause -- for proper population of the sub-handlers via the decorators
(see `@kopf.on.this`).
"""
# For the field-handlers, the old/new/diff values must match the field, not the whole object.
if (True and # for readable indenting
isinstance(cause, causation.ResourceChangingCause) and
isinstance(handler, registries.ResourceHandler) and
handler.field is not None):
old = dicts.resolve(cause.old, handler.field, None, assume_empty=True)
new = dicts.resolve(cause.new, handler.field, None, assume_empty=True)
diff = diffs.reduce(cause.diff, handler.field)
cause = causation.enrich_cause(cause=cause, old=old, new=new, diff=diff)
# Store the context of the current resource-object-event-handler, to be used in `@kopf.on.this`,
# and maybe other places, and consumed in the recursive `execute()` calls for the children.
# This replaces the multiple kwargs passing through the whole call stack (easy to forget).
with invocation.context([
(sublifecycle_var, lifecycle),
(subregistry_var, registries.ResourceChangingRegistry(prefix=handler.id)),
(subexecuted_var, False),
(handler_var, handler),
(cause_var, cause),
]):
# And call it. If the sub-handlers are not called explicitly, run them implicitly
# as if it was done inside of the handler (i.e. under try-finally block).
result = await invocation.invoke(
handler.fn,
*args,