Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_mixed_sources(invoke, real_run):
result = invoke(['run', 'handler1.py', '-m', 'package.module_2'])
assert result.exit_code == 0
registry = kopf.get_default_registry()
assert len(registry.resources) == 1
resource = list(registry.resources)[0]
handlers = registry._resource_changing_handlers[resource]._handlers
assert len(handlers) == 2
assert handlers[0].id == 'create_fn'
assert handlers[1].id == 'update_fn'
def test_setting_default_registry():
registry_expected = kopf.OperatorRegistry()
kopf.set_default_registry(registry_expected)
registry_actual = kopf.get_default_registry()
assert registry_actual is registry_expected
def test_on_delete_minimal(mocker):
registry = kopf.get_default_registry()
resource = Resource('group', 'version', 'plural')
cause = mocker.MagicMock(resource=resource, reason=Reason.DELETE)
@kopf.on.delete('group', 'version', 'plural')
def fn(**_):
pass
handlers = registry.get_resource_changing_handlers(cause)
assert len(handlers) == 1
assert handlers[0].fn is fn
assert handlers[0].reason == Reason.DELETE
assert handlers[0].field is None
assert handlers[0].errors is None
assert handlers[0].timeout is None
assert handlers[0].retries is None
assert handlers[0].backoff is None
def test_on_create_minimal(mocker):
registry = kopf.get_default_registry()
resource = Resource('group', 'version', 'plural')
cause = mocker.MagicMock(resource=resource, reason=Reason.CREATE)
@kopf.on.create('group', 'version', 'plural')
def fn(**_):
pass
handlers = registry.get_resource_changing_handlers(cause)
assert len(handlers) == 1
assert handlers[0].fn is fn
assert handlers[0].reason == Reason.CREATE
assert handlers[0].field is None
assert handlers[0].errors is None
assert handlers[0].timeout is None
assert handlers[0].retries is None
assert handlers[0].backoff is None
def test_two_modules(invoke, real_run):
result = invoke(['run', '-m', 'package.module_1', '-m', 'package.module_2'])
assert result.exit_code == 0
registry = kopf.get_default_registry()
assert len(registry.resources) == 1
resource = list(registry.resources)[0]
handlers = registry._resource_changing_handlers[resource]._handlers
assert len(handlers) == 2
assert handlers[0].id == 'create_fn'
assert handlers[1].id == 'update_fn'
def test_on_probe_minimal():
registry = kopf.get_default_registry()
@kopf.on.probe()
def fn(**_):
pass
handlers = registry.get_activity_handlers(activity=Activity.PROBE)
assert len(handlers) == 1
assert handlers[0].fn is fn
assert handlers[0].activity == Activity.PROBE
assert handlers[0].errors is None
assert handlers[0].timeout is None
assert handlers[0].retries is None
assert handlers[0].backoff is None
assert handlers[0].cooldown is None # deprecated alias
def test_on_cleanup_minimal():
registry = kopf.get_default_registry()
@kopf.on.cleanup()
def fn(**_):
pass
handlers = registry.get_activity_handlers(activity=Activity.CLEANUP)
assert len(handlers) == 1
assert handlers[0].fn is fn
assert handlers[0].activity == Activity.CLEANUP
assert handlers[0].errors is None
assert handlers[0].timeout is None
assert handlers[0].retries is None
assert handlers[0].backoff is None
assert handlers[0].cooldown is None # deprecated alias
def test_on_field_minimal(mocker):
registry = kopf.get_default_registry()
resource = Resource('group', 'version', 'plural')
diff = [('op', ('field', 'subfield'), 'old', 'new')]
cause = mocker.MagicMock(resource=resource, reason=Reason.UPDATE, diff=diff)
@kopf.on.field('group', 'version', 'plural', 'field.subfield')
def fn(**_):
pass
handlers = registry.get_resource_changing_handlers(cause)
assert len(handlers) == 1
assert handlers[0].fn is fn
assert handlers[0].reason is None
assert handlers[0].field == ('field', 'subfield')
assert handlers[0].errors is None
assert handlers[0].timeout is None
assert handlers[0].retries is None
def test_one_file(invoke, real_run):
result = invoke(['run', 'handler1.py'])
assert result.exit_code == 0
registry = kopf.get_default_registry()
assert len(registry.resources) == 1
resource = list(registry.resources)[0]
handlers = registry._resource_changing_handlers[resource]._handlers
assert len(handlers) == 1
assert handlers[0].id == 'create_fn'