Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_wait_for_cs_completion_calls_get_cs_status(
self, mock_get_cs_status
):
mock_get_cs_status.side_effect = [
StackChangeSetStatus.PENDING, StackChangeSetStatus.READY
]
self.actions.wait_for_cs_completion(sentinel.change_set_name)
mock_get_cs_status.assert_called_with(sentinel.change_set_name)
def test_get_cs_status_handles_all_statuses(
self, mock_describe_change_set
):
scss = StackChangeSetStatus
return_values = { # NOQA
"Status": ('CREATE_PENDING', 'CREATE_IN_PROGRESS', 'CREATE_COMPLETE', 'DELETE_COMPLETE', 'FAILED'), # NOQA
"ExecutionStatus": { # NOQA
'UNAVAILABLE': (scss.PENDING, scss.PENDING, scss.PENDING, scss.DEFUNCT, scss.DEFUNCT), # NOQA
'AVAILABLE': (scss.PENDING, scss.PENDING, scss.READY, scss.DEFUNCT, scss.DEFUNCT), # NOQA
'EXECUTE_IN_PROGRESS': (scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT), # NOQA
'EXECUTE_COMPLETE': (scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT), # NOQA
'EXECUTE_FAILED': (scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT), # NOQA
'OBSOLETE': (scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT, scss.DEFUNCT), # NOQA
} # NOQA
} # NOQA
for i, status in enumerate(return_values['Status']):
for exec_status, returns in \
return_values['ExecutionStatus'].items():
mock_describe_change_set.return_value = {
options=ctx.obj.get("options"),
output_format=ctx.obj.get("output_format"),
ignore_dependencies=ctx.obj.get("ignore_dependencies")
)
plan = SceptrePlan(context)
if change_set:
change_set_name = "-".join(["change-set", uuid1().hex])
plan.create_change_set(change_set_name)
try:
# Wait for change set to be created
statuses = plan.wait_for_cs_completion(change_set_name)
# Exit if change set fails to create
for status in list(statuses.values()):
if status != StackChangeSetStatus.READY:
exit(1)
# Describe changes
descriptions = plan.describe_change_set(change_set_name)
for description in list(descriptions.values()):
if not verbose:
description = simplify_change_set_description(description)
write(description, context.output_format)
# Execute change set if happy with changes
if yes or click.confirm("Proceed with stack update?"):
plan.execute_change_set(change_set_name)
except Exception as e:
raise e
finally:
# Clean up by deleting change set
def update_with_change_set(ctx, environment, stack, verbose):
"""
Updates the stack using a change set.
Creates a change set for ENVIRONMENT/STACK, prints out a description of the
changes, and prompts the user to decide whether to execute or delete it.
"""
stack = _get_stack(environment, stack)
change_set_name = "-".join(["change-set", uuid1().hex])
with change_set(stack, change_set_name):
status = stack.wait_for_cs_completion(change_set_name)
description = stack.describe_change_set(change_set_name)
if not verbose:
description = _simplify_change_set_description(description)
write(description, ctx.obj["output_format"])
if status != StackChangeSetStatus.READY:
exit(1)
if click.confirm("Proceed with stack update?"):
stack.execute_change_set(change_set_name)
raise UnknownStackChangeSetStatusError(
"ExecutionStatus {0} is unknown".format(cs_status)
)
if (
cs_status == "CREATE_COMPLETE" and
cs_exec_status == "AVAILABLE"
):
return StackChangeSetStatus.READY
elif (
cs_status in [
"CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE"
] and
cs_exec_status in ["UNAVAILABLE", "AVAILABLE"]
):
return StackChangeSetStatus.PENDING
elif (
cs_status in ["DELETE_COMPLETE", "FAILED"] or
cs_exec_status in [
"EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE",
"EXECUTE_FAILED", "OBSOLETE"
]
):
return StackChangeSetStatus.DEFUNCT
else: # pragma: no cover
raise Exception("This else should not be reachable.")
def wait_for_cs_completion(self, change_set_name):
"""
Waits while the Stack Change Set status is "pending".
:param change_set_name: The name of the Change Set.
:type change_set_name: str
:returns: The Change Set's status.
:rtype: sceptre.stack_status.StackChangeSetStatus
"""
while True:
status = self._get_cs_status(change_set_name)
if status != StackChangeSetStatus.PENDING:
break
time.sleep(2)
return status
]
if cs_status not in possible_statuses:
raise UnknownStackChangeSetStatusError(
"Status {0} is unknown".format(cs_status)
)
if cs_exec_status not in possible_execution_statuses:
raise UnknownStackChangeSetStatusError(
"ExecutionStatus {0} is unknown".format(cs_status)
)
if (
cs_status == "CREATE_COMPLETE" and
cs_exec_status == "AVAILABLE"
):
return StackChangeSetStatus.READY
elif (
cs_status in [
"CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE"
] and
cs_exec_status in ["UNAVAILABLE", "AVAILABLE"]
):
return StackChangeSetStatus.PENDING
elif (
cs_status in ["DELETE_COMPLETE", "FAILED"] or
cs_exec_status in [
"EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE",
"EXECUTE_FAILED", "OBSOLETE"
]
):
return StackChangeSetStatus.DEFUNCT
else: # pragma: no cover
return StackChangeSetStatus.READY
elif (
cs_status in [
"CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE"
] and
cs_exec_status in ["UNAVAILABLE", "AVAILABLE"]
):
return StackChangeSetStatus.PENDING
elif (
cs_status in ["DELETE_COMPLETE", "FAILED"] or
cs_exec_status in [
"EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE",
"EXECUTE_FAILED", "OBSOLETE"
]
):
return StackChangeSetStatus.DEFUNCT
else: # pragma: no cover
raise Exception("This else should not be reachable.")