Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_purge_progress_when_already_empty_in_body_but_not_in__patch(handler):
body = {}
patch = {'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}}
origbody = copy.deepcopy(body)
state = State.from_body(body=body, handlers=[handler])
state.purge(patch=patch, body=body)
assert not patch
assert body == origbody # not modified
def test_sleeping_flag(handler, expected, body):
origbody = copy.deepcopy(body)
state = State.from_body(body=body, handlers=[handler])
result = state[handler.id].sleeping
assert result == expected
assert body == origbody # not modified
def test_awakening_time(handler, expected, body):
origbody = copy.deepcopy(body)
state = State.from_body(body=body, handlers=[handler])
result = state[handler.id].delayed
assert result == expected
assert body == origbody # not modified
def test_awakened_flag(handler, expected, body):
origbody = copy.deepcopy(body)
state = State.from_body(body=body, handlers=[handler])
result = state[handler.id].awakened
assert result == expected
assert body == origbody # not modified
def test_store_success(handler, expected_retries, expected_stopped, body):
origbody = copy.deepcopy(body)
patch = {}
state = State.from_body(body=body, handlers=[handler])
state = state.with_outcomes(outcomes={handler.id: HandlerOutcome(final=True)})
state.store(patch=patch)
assert patch['status']['kopf']['progress']['some-id']['success'] is True
assert patch['status']['kopf']['progress']['some-id']['failure'] is False
assert patch['status']['kopf']['progress']['some-id']['retries'] == expected_retries
assert patch['status']['kopf']['progress']['some-id']['stopped'] == expected_stopped
assert patch['status']['kopf']['progress']['some-id']['message'] is None
assert body == origbody # not modified
def test_get_retry_count(handler, expected, body):
origbody = copy.deepcopy(body)
state = State.from_body(body=body, handlers=[handler])
result = state[handler.id].retries
assert result == expected
assert body == origbody # not modified
def test_finished_flag(handler, expected, body):
origbody = copy.deepcopy(body)
state = State.from_body(body=body, handlers=[handler])
result = state[handler.id].finished
assert result == expected
assert body == origbody # not modified
logger = cause.logger
patch = cause.patch # TODO get rid of this alias
body = cause.body # TODO get rid of this alias
delay = None
done = None
skip = None
# Regular causes invoke the handlers.
if cause.reason in causation.HANDLER_REASONS:
title = causation.TITLES.get(cause.reason, repr(cause.reason))
logger.debug(f"{title.capitalize()} event: %r", body)
if cause.diff and cause.old is not None and cause.new is not None:
logger.debug(f"{title.capitalize()} diff: %r", cause.diff)
handlers = registry.get_resource_changing_handlers(cause=cause)
state = states.State.from_body(body=cause.body, handlers=handlers)
if handlers:
outcomes = await _execute_handlers(
lifecycle=lifecycle,
handlers=handlers,
cause=cause,
state=state,
)
state = state.with_outcomes(outcomes)
state.store(patch=cause.patch)
states.deliver_results(outcomes=outcomes, patch=cause.patch)
if state.done:
logger.info(f"All handlers succeeded for {title}.")
state.purge(patch=cause.patch, body=cause.body)
done = state.done