Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_nothing_is_called_when_freeze_is_set(mocker, resource, caplog, assert_logs):
detect_cause = mocker.patch('kopf.reactor.causation.detect_resource_changing_cause')
handle_cause = mocker.patch('kopf.reactor.handling.handle_resource_changing_cause')
patch_obj = mocker.patch('kopf.clients.patching.patch_obj')
asyncio_sleep = mocker.patch('asyncio.sleep')
# Nothing of these is actually used, but we need to feed something.
# Except for namespace+name, which are used for the logger prefixes.
lifecycle = kopf.lifecycles.all_at_once
registry = OperatorRegistry()
event = {'object': {'metadata': {'namespace': 'ns1', 'name': 'name1'}}}
caplog.set_level(logging.DEBUG)
await resource_handler(
lifecycle=lifecycle,
registry=registry,
resource=resource,
memories=ResourceMemories(),
event=event,
freeze_mode=Toggle(True), # make it frozen!
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
assert not detect_cause.called
async def test_acquire(registry, handlers, resource, cause_mock, event_type,
caplog, assert_logs, k8s_mocked):
caplog.set_level(logging.DEBUG)
cause_mock.reason = Reason.ACQUIRE
event_queue = asyncio.Queue()
await resource_handler(
lifecycle=kopf.lifecycles.all_at_once,
registry=registry,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
replenished=asyncio.Event(),
event_queue=event_queue,
)
assert not handlers.create_mock.called
assert not handlers.update_mock.called
assert not handlers.delete_mock.called
assert k8s_mocked.asyncio_sleep.call_count == 0
assert k8s_mocked.sleep_or_wait.call_count == 0
assert k8s_mocked.patch_obj.call_count == 1
assert event_queue.empty()
kopf.lifecycles.all_at_once,
kopf.lifecycles.one_by_one,
kopf.lifecycles.randomized,
kopf.lifecycles.shuffled,
kopf.lifecycles.asap,
])
def test_with_empty_input(lifecycle):
handlers = []
selected = lifecycle(handlers, body={})
assert isinstance(selected, (tuple, list))
assert len(selected) == 0
async def test_fatal_error_stops_handler(
registry, handlers, extrahandlers, resource, cause_mock, cause_type,
caplog, assert_logs, k8s_mocked):
caplog.set_level(logging.DEBUG)
name1 = f'{cause_type}_fn'
event_type = None if cause_type == Reason.RESUME else 'irrelevant'
cause_mock.reason = cause_type
handlers.create_mock.side_effect = PermanentError("oops")
handlers.update_mock.side_effect = PermanentError("oops")
handlers.delete_mock.side_effect = PermanentError("oops")
handlers.resume_mock.side_effect = PermanentError("oops")
await resource_handler(
lifecycle=kopf.lifecycles.one_by_one,
registry=registry,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
assert handlers.create_mock.call_count == (1 if cause_type == Reason.CREATE else 0)
assert handlers.update_mock.call_count == (1 if cause_type == Reason.UPDATE else 0)
assert handlers.delete_mock.call_count == (1 if cause_type == Reason.DELETE else 0)
assert handlers.resume_mock.call_count == (1 if cause_type == Reason.RESUME else 0)
assert not k8s_mocked.sleep_or_wait.called
assert k8s_mocked.patch_obj.called
async def test_handlers_called_always(
registry, handlers, extrahandlers, resource, cause_mock, cause_type,
caplog, assert_logs, k8s_mocked):
caplog.set_level(logging.DEBUG)
cause_mock.reason = cause_type
await resource_handler(
lifecycle=kopf.lifecycles.all_at_once,
registry=registry,
resource=resource,
memories=ResourceMemories(),
event={'type': 'ev-type', 'object': cause_mock.body},
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
assert handlers.event_mock.call_count == 1
assert extrahandlers.event_mock.call_count == 1
event = handlers.event_mock.call_args_list[0][1]['event']
assert event['type'] == 'ev-type'
assert event['object'] is cause_mock.body
assert event['type'] == 'ev-type'
assert event['object'] is cause_mock.body
caplog.set_level(logging.DEBUG)
name1 = f'{cause_type}_fn'
event_type = None if cause_type == Reason.RESUME else 'irrelevant'
cause_mock.reason = cause_type
cause_mock.body.update({
'status': {'kopf': {'progress': {
'create_fn': {'retries': 100},
'update_fn': {'retries': 100},
'delete_fn': {'retries': 100},
'resume_fn': {'retries': 100},
}}}
})
await resource_handler(
lifecycle=kopf.lifecycles.one_by_one,
registry=registry,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
assert not handlers.create_mock.called
assert not handlers.update_mock.called
assert not handlers.delete_mock.called
assert not handlers.resume_mock.called
# Progress is reset, as the handler is not going to retry.
assert not k8s_mocked.sleep_or_wait.called
assert k8s_mocked.patch_obj.called
async def test_all_logs_are_prefixed(registry, resource, handlers,
logstream, cause_type, cause_mock):
event_type = None if cause_type == Reason.RESUME else 'irrelevant'
cause_mock.reason = cause_type
await resource_handler(
lifecycle=kopf.lifecycles.all_at_once,
registry=registry,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
lines = logstream.getvalue().splitlines()
assert lines # no messages means that we cannot test it
assert all(line.startswith('prefix [ns1/name1] ') for line in lines)
name1 = f'{cause_type}_fn'
name2 = f'{cause_type}_fn2'
event_type = None if cause_type == Reason.RESUME else 'irrelevant'
cause_mock.reason = cause_type
cause_mock.body.update({
'status': {'kopf': {'progress': {
'resume_fn': {'started': '1979-01-01T00:00:00', 'success': True},
'resume_fn2': {'started': '1979-01-01T00:00:00', 'success': True},
name1: {'started': '1979-01-01T00:00:00', 'success': True},
name2: {'started': '1979-01-01T00:00:00'},
}}}
})
await resource_handler(
lifecycle=kopf.lifecycles.one_by_one,
registry=registry,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
assert extrahandlers.create_mock.call_count == (1 if cause_type == Reason.CREATE else 0)
assert extrahandlers.update_mock.call_count == (1 if cause_type == Reason.UPDATE else 0)
assert extrahandlers.delete_mock.call_count == (1 if cause_type == Reason.DELETE else 0)
assert extrahandlers.resume_mock.call_count == (1 if cause_type == Reason.RESUME else 0)
assert not k8s_mocked.sleep_or_wait.called
assert k8s_mocked.patch_obj.called
async def test_diffs_not_logged_if_absent(registry, resource, handlers, cause_type, cause_mock,
caplog, assert_logs, diff):
caplog.set_level(logging.DEBUG)
event_type = None if cause_type == Reason.RESUME else 'irrelevant'
cause_mock.reason = cause_type
cause_mock.diff = diff
await resource_handler(
lifecycle=kopf.lifecycles.all_at_once,
registry=registry,
resource=resource,
memories=ResourceMemories(),
event={'type': event_type, 'object': cause_mock.body},
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)
assert_logs([
" event: ",
], prohibited=[
" diff: "
])
kopf.lifecycles.randomized,
kopf.lifecycles.shuffled,
kopf.lifecycles.asap,
])
def test_with_empty_input(lifecycle):
handlers = []
selected = lifecycle(handlers, body={})
assert isinstance(selected, (tuple, list))
assert len(selected) == 0