Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
step: (CommentedMap/dict) This is the actual step as it
exists in the pipeline yaml.
"""
if hasattr(step, 'lc'):
# line_no: optional. Has value only when the yaml
# round trip parser is in use.
self.line_no = step.lc.line
# line_col: optional. Has value only when the yaml
# round trip parser is in use.
self.line_col = step.lc.col
self.name = step.get('name', None)
if not self.name:
raise PipelineDefinitionError('step must have a name.')
logger.debug("%s is complex.", self.name)
self.in_parameters = step.get('in', None)
# description: optional. Write to stdout if exists and flagged.
self.description = step.get('description', None)
if self.description:
logger.notify("%s: %s", self.name, self.description)
else:
logger.debug("step name: %s", self.name)
# foreach: optional value. None by default.
self.foreach_items = step.get('foreach', None)
# retry: optional, defaults none.
# stop: optional. defaults None.
self.stop = while_definition.get('stop', None)
if self.stop is None and self.max is None:
logger.error("while decorator missing both max and stop.")
raise PipelineDefinitionError("the while decorator must have "
"either max or stop, or both. "
"But not neither. Note that "
"setting stop: False with no "
"max is an infinite loop. If "
"an infinite loop is really "
"what you want, set stop: False")
else:
# if it isn't a dict, pipeline configuration is wrong.
logger.error("while decorator definition incorrect.")
raise PipelineDefinitionError("while decorator must be a dict "
"(i.e a map) type.")
logger.debug("done")
if isinstance(while_definition, dict):
# errorOnMax: optional. defaults False
self.error_on_max = while_definition.get('errorOnMax', False)
# max: optional. defaults None.
self.max = while_definition.get('max', None)
# sleep: optional. defaults 0.
self.sleep = while_definition.get('sleep', 0)
# stop: optional. defaults None.
self.stop = while_definition.get('stop', None)
if self.stop is None and self.max is None:
logger.error("while decorator missing both max and stop.")
raise PipelineDefinitionError("the while decorator must have "
"either max or stop, or both. "
"But not neither. Note that "
"setting stop: False with no "
"max is an infinite loop. If "
"an infinite loop is really "
"what you want, set stop: False")
else:
# if it isn't a dict, pipeline configuration is wrong.
logger.error("while decorator definition incorrect.")
raise PipelineDefinitionError("while decorator must be a dict "
"(i.e a map) type.")
logger.debug("done")
if isinstance(retry_definition, dict):
# max: optional. defaults None.
self.max = retry_definition.get('max', None)
# sleep: optional. defaults 0.
self.sleep = retry_definition.get('sleep', 0)
# stopOn: optional. defaults None.
self.stop_on = retry_definition.get('stopOn', None)
# retryOn: optional. defaults None.
self.retry_on = retry_definition.get('retryOn', None)
else:
# if it isn't a dict, pipeline configuration is wrong.
logger.error("retry decorator definition incorrect.")
raise PipelineDefinitionError("retry decorator must be a dict "
"(i.e a map) type.")
logger.debug("done")
mutate - after method execution will contain the new
updated context.
step_method: (method/function) This is the method/function that
will execute on every loop iteration. Signature is:
function(context)
"""
logger.debug("starting")
context['whileCounter'] = 0
if self.stop is None and self.max is None:
# the ctor already does this check, but guess theoretically
# consumer could have messed with the props since ctor
logger.error("while decorator missing both max and stop.")
raise PipelineDefinitionError("the while decorator must have "
"either max or stop, or both. "
"But not neither.")
error_on_max = context.get_formatted_as_type(
self.error_on_max, out_type=bool)
sleep = context.get_formatted_as_type(self.sleep, out_type=float)
if self.max is None:
max = None
logger.info("while decorator will loop until %s "
"evaluates to True at %ss intervals.",
self.stop, sleep)
else:
max = context.get_formatted_as_type(self.max, out_type=int)
if max < 1:
logger.info(