Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_resources():
registry = GlobalRegistry()
registry.register_cause_handler('group1', 'version1', 'plural1', some_fn)
registry.register_cause_handler('group2', 'version2', 'plural2', some_fn)
resources = registry.resources
assert isinstance(resources, collections.abc.Collection)
assert len(resources) == 2
resource1 = Resource('group1', 'version1', 'plural1')
resource2 = Resource('group2', 'version2', 'plural2')
assert resource1 in resources
assert resource2 in resources
def test_url_of_builtin_resource_item_namespaced():
resource = Resource('', 'v1', 'plural')
url = resource.get_url(namespace='ns-a.b', name='name-a.b')
assert url == '/api/v1/namespaces/ns-a.b/plural/name-a.b'
def test_url_of_builtin_resource_item_cluster_scoped():
resource = Resource('', 'v1', 'plural')
url = resource.get_url(name='name-a.b')
assert url == '/api/v1/plural/name-a.b'
def test_creation_with_all_args():
resource = Resource(
'group',
'version',
'plural',
)
assert resource.group == 'group'
assert resource.version == 'version'
assert resource.plural == 'plural'
def test_name_of_builtin_resource():
resource = Resource('', 'v1', 'plural')
name = resource.name
assert name == 'plural'
def test_on_update_with_all_kwargs(mocker):
registry = OperatorRegistry()
resource = Resource('group', 'version', 'plural')
cause = mocker.MagicMock(resource=resource, reason=Reason.UPDATE)
mocker.patch('kopf.reactor.registries.match', return_value=True)
@kopf.on.update('group', 'version', 'plural',
id='id', registry=registry,
errors=ErrorsMode.PERMANENT, timeout=123, retries=456, backoff=78,
labels={'somelabel': 'somevalue'},
annotations={'someanno': 'somevalue'})
def fn(**_):
pass
handlers = registry.get_resource_changing_handlers(cause)
assert len(handlers) == 1
assert handlers[0].fn is fn
assert handlers[0].reason == Reason.UPDATE
assert handlers[0].field is None
def test_url_of_builtin_resource_list_namespaced():
resource = Resource('', 'v1', 'plural')
url = resource.get_url(namespace='ns-a.b')
assert url == '/api/v1/namespaces/ns-a.b/plural'
def test_on_update_minimal(mocker):
registry = kopf.get_default_registry()
resource = Resource('group', 'version', 'plural')
cause = mocker.MagicMock(resource=resource, reason=Reason.UPDATE)
@kopf.on.update('group', 'version', 'plural')
def fn(**_):
pass
handlers = registry.get_resource_changing_handlers(cause)
assert len(handlers) == 1
assert handlers[0].fn is fn
assert handlers[0].reason == Reason.UPDATE
assert handlers[0].field is None
assert handlers[0].errors is None
assert handlers[0].timeout is None
assert handlers[0].retries is None
assert handlers[0].backoff is None
assert handlers[0].cooldown is None # deprecated alias
pass # ignore initial handlers in non-initial causes.
elif handler.initial and cause.deleted and not handler.deleted:
pass # ignore initial handlers on deletion, unless explicitly marked as usable.
elif match(handler=handler, body=cause.body, changed_fields=changed_fields):
yield handler
class OperatorRegistry:
"""
A global registry is used for handling of multiple resources & activities.
It is usually populated by the ``@kopf.on...`` decorators, but can also
be explicitly created and used in the embedded operators.
"""
_activity_handlers: ActivityRegistry
_resource_watching_handlers: MutableMapping[resources_.Resource, ResourceWatchingRegistry]
_resource_changing_handlers: MutableMapping[resources_.Resource, ResourceChangingRegistry]
def __init__(self) -> None:
super().__init__()
self._activity_handlers = ActivityRegistry()
self._resource_watching_handlers = collections.defaultdict(ResourceWatchingRegistry)
self._resource_changing_handlers = collections.defaultdict(ResourceChangingRegistry)
@property
def resources(self) -> FrozenSet[resources_.Resource]:
""" All known resources in the registry. """
return frozenset(self._resource_watching_handlers) | frozenset(self._resource_changing_handlers)
def register_activity_handler(
self,
fn: ActivityHandlerFn,
field: Optional[dicts.FieldSpec] = None,
errors: Optional[ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
backoff: Optional[float] = None,
cooldown: Optional[float] = None, # deprecated, use `backoff`
initial: Optional[bool] = None,
deleted: Optional[bool] = None,
requires_finalizer: bool = False,
labels: Optional[bodies.Labels] = None,
annotations: Optional[bodies.Annotations] = None,
) -> ResourceHandlerFn:
"""
Register an additional handler function for the specific resource and specific reason.
"""
resource = resources_.Resource(group, version, plural)
return self._resource_changing_handlers[resource].register(
reason=reason, event=event, field=field, fn=fn, id=id,
errors=errors, timeout=timeout, retries=retries, backoff=backoff, cooldown=cooldown,
initial=initial, deleted=deleted, requires_finalizer=requires_finalizer,
labels=labels, annotations=annotations,
)