Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_on_probe_with_all_kwargs(mocker):
registry = OperatorRegistry()
@kopf.on.probe(
id='id', registry=registry,
errors=ErrorsMode.PERMANENT, timeout=123, retries=456, backoff=78)
def fn(**_):
pass
handlers = registry.get_activity_handlers(activity=Activity.PROBE)
assert len(handlers) == 1
assert handlers[0].fn is fn
assert handlers[0].activity == Activity.PROBE
assert handlers[0].id == 'id'
assert handlers[0].errors == ErrorsMode.PERMANENT
assert handlers[0].timeout == 123
assert handlers[0].retries == 456
assert handlers[0].backoff == 78
assert handlers[0].cooldown == 78 # deprecated alias
@pytest.mark.parametrize('activity', list(Activity))
def test_operator_registry_with_activity_with_minimal_signature(
operator_registry_cls, activity):
registry = operator_registry_cls()
registry.register_activity_handler(some_fn)
handlers = registry.get_activity_handlers(activity=activity)
assert len(handlers) == 1
assert handlers[0].fn is some_fn
async def test_liveness_with_reporting(liveness_url, liveness_registry):
def fn1(**kwargs):
return {'x': 100}
def fn2(**kwargs):
return {'y': '200'}
liveness_registry.register_activity_handler(fn=fn1, id='id1', activity=Activity.PROBE)
liveness_registry.register_activity_handler(fn=fn2, id='id2', activity=Activity.PROBE)
async with aiohttp.ClientSession() as session:
async with session.get(liveness_url) as response:
data = await response.json()
assert isinstance(data, dict)
assert data == {'id1': {'x': 100}, 'id2': {'y': '200'}}
async def test_liveness_data_is_cached(liveness_url, liveness_registry):
counter = 0
def fn1(**kwargs):
nonlocal counter
counter += 1
return {'counter': counter}
liveness_registry.register_activity_handler(fn=fn1, id='id1', activity=Activity.PROBE)
async with aiohttp.ClientSession() as session:
async with session.get(liveness_url) as response:
data = await response.json()
assert isinstance(data, dict)
assert data == {'id1': {'counter': 1}}
async with session.get(liveness_url) as response:
data = await response.json()
assert isinstance(data, dict)
assert data == {'id1': {'counter': 1}} # not 2!
@pytest.mark.parametrize('activity', list(Activity))
def test_operator_registry_with_activity_via_iter(
operator_registry_cls, activity):
registry = operator_registry_cls()
iterator = registry.iter_activity_handlers(activity=activity)
assert isinstance(iterator, collections.abc.Iterator)
assert not isinstance(iterator, collections.abc.Collection)
assert not isinstance(iterator, collections.abc.Container)
assert not isinstance(iterator, (list, tuple))
handlers = list(iterator)
assert not handlers
# Wait for all other root tasks to exit before cleaning up.
# Beware: on explicit operator cancellation, there is no graceful period at all.
try:
current_task = asyncio.current_task()
awaited_tasks = {task for task in root_tasks if task is not current_task}
await _wait(awaited_tasks)
except asyncio.CancelledError:
logger.warning("Cleanup activity is not executed at all due to cancellation.")
raise
# Execute the cleanup activity after all other root tasks are presumably done.
try:
await handling.activity_trigger(
lifecycle=lifecycles.all_at_once,
registry=registry,
activity=causation.Activity.CLEANUP,
)
await vault.close()
except asyncio.CancelledError:
logger.warning("Cleanup activity is only partially executed due to cancellation.")
raise
def decorator(fn: registries.ActivityHandlerFn) -> registries.ActivityHandlerFn:
return actual_registry.register_activity_handler(
fn=fn, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
activity=causation.Activity.STARTUP,
)
id='login_via_pykube',
fn=cast(ActivityHandlerFn, piggybacking.login_via_pykube),
activity=causation.Activity.AUTHENTICATION,
errors=ErrorsMode.IGNORED,
_fallback=True,
)
try:
import kubernetes
except ImportError:
pass
else:
self.register_activity_handler(
id='login_via_client',
fn=cast(ActivityHandlerFn, piggybacking.login_via_client),
activity=causation.Activity.AUTHENTICATION,
errors=ErrorsMode.IGNORED,
_fallback=True,
)
registry: registries.OperatorRegistry,
vault: credentials.Vault,
_activity_title: str = "Authentication",
) -> None:
""" Retrieve the credentials once, successfully or not, and exit. """
# Sleep most of the time waiting for a signal to re-auth.
await vault.wait_for_emptiness()
# Log initial and re-authentications differently, for readability.
logger.info(f"{_activity_title} has been initiated.")
activity_results = await handling.activity_trigger(
lifecycle=lifecycles.all_at_once,
registry=registry,
activity=causation.Activity.AUTHENTICATION,
)
if activity_results:
logger.info(f"{_activity_title} has finished.")
else:
logger.warning(f"{_activity_title} has failed: "
f"no credentials were retrieved from the login handlers.")
# Feed the credentials into the vault, and unfreeze the re-authenticating clients.
await vault.populate({str(handler_id): info for handler_id, info in activity_results.items()})
warnings.warn("cooldown=... is deprecated, use backoff=...", DeprecationWarning)
self.backoff = cooldown
# @property cannot be used due to a data field definition with the same name.
def __getattribute__(self, name: str) -> Any:
if name == 'cooldown':
warnings.warn("handler.cooldown is deprecated, use handler.backoff", DeprecationWarning)
return self.backoff
else:
return super().__getattribute__(name)
@dataclasses.dataclass
class ActivityHandler(BaseHandler):
fn: ActivityHandlerFn # type clarification
activity: Optional[causation.Activity] = None
_fallback: bool = False # non-public!
@dataclasses.dataclass
class ResourceHandler(BaseHandler):
fn: ResourceHandlerFn # type clarification
reason: Optional[causation.Reason]
field: Optional[dicts.FieldPath]
initial: Optional[bool] = None
deleted: Optional[bool] = None # used for mixed-in (initial==True) @on.resume handlers only.
labels: Optional[bodies.Labels] = None
annotations: Optional[bodies.Annotations] = None
requires_finalizer: Optional[bool] = None
@property
def event(self) -> Optional[causation.Reason]: