Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
will execute on every loop iteration. Signature is:
function(context)
Returns:
bool. True if step execution completed without error.
False if error occured during step execution.
"""
logger.debug("starting")
context['retryCounter'] = counter
logger.info("retry: running step with counter %s", counter)
try:
step_method(context)
result = True
except (ControlOfFlowInstruction, Stop):
# Control-of-Flow/Stop are instructions to go somewhere
# else, not errors per se.
raise
except Exception as ex_info:
if self.max:
if counter == self.max:
logger.debug("retry: max %s retries exhausted. "
"raising error.", counter)
# arguably shouldn't be using errs for control of flow.
# but would lose the err info if not, so lesser of 2 evils.
raise
if self.stop_on or self.retry_on:
error_name = get_error_name(ex_info)
if self.stop_on:
formatted_stop_list = context.get_formatted_iterable(
"run. groups is None.")
try:
# run main steps
for step_group in groups:
self.run_step_group(step_group)
# if nothing went wrong, run on_success
if success_group:
logger.debug(
"pipeline steps complete. Running %s steps now.",
success_group)
self.run_step_group(success_group)
else:
logger.debug(
"pipeline steps complete. No success group specified.")
except (ControlOfFlowInstruction, Stop):
# Control-of-Flow/Stop are instructions to go somewhere
# else, not errors per se.
raise
except Exception:
# yes, yes, don't catch Exception. Have to, though, to run failure
# handler. Also, it does raise it back up.
if failure_group:
logger.error(
"Something went wrong. Will now try to run %s.",
failure_group)
# failure_step_group will log but swallow any errors
self.run_failure_step_group(failure_group)
else:
logger.debug(
"Something went wrong. No failure group specified.")
pipeline_context_input=pipe_arg,
context=child_context,
parse_input=not skip_parse,
loader=loader,
groups=step_groups,
success_group=success_group,
failure_group=failure_group
)
if out:
write_child_context_to_parent(out=out,
parent_context=context,
child_context=child_context)
logger.info("pyped %s.", pipeline_name)
except (ControlOfFlowInstruction, Stop):
# Control-of-Flow/Stop are instructions to go somewhere
# else, not errors per se.
raise
except Exception as ex_info:
# yes, yes, don't catch Exception. Have to, though, in order to swallow
# errs if !raise_error
logger.error("Something went wrong pyping %s. %s: %s",
pipeline_name, type(ex_info).__name__, ex_info)
if raise_error:
logger.debug("Raising original exception to caller.")
raise
else:
logger.debug(
"raiseError is False. Swallowing error in %s.", pipeline_name)
Args:
groups: list of str. List of step - groups to execute.
success_group: str. Step - group to execute on success condition.
failure_group: str. Step - group to execute on failure condition.
"""
self.groups = groups
self.success_group = success_group
self.failure_group = failure_group
class Call(ControlOfFlowInstruction):
"""Stop current step, call a new step group, resume current step after."""
class Jump(ControlOfFlowInstruction):
"""Stop step execution and jump to a new step group."""
"""
def __init__(self, groups, success_group, failure_group):
"""Initialize the control of flow instruction.
Args:
groups: list of str. List of step - groups to execute.
success_group: str. Step - group to execute on success condition.
failure_group: str. Step - group to execute on failure condition.
"""
self.groups = groups
self.success_group = success_group
self.failure_group = failure_group
class Call(ControlOfFlowInstruction):
"""Stop current step, call a new step group, resume current step after."""
class Jump(ControlOfFlowInstruction):
"""Stop step execution and jump to a new step group."""