Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
kwargs[Field.Resource.value] = 'arn:aws:states:::sagemaker:createTrainingJob.sync'
else:
kwargs[Field.Resource.value] = 'arn:aws:states:::sagemaker:createTrainingJob'
if isinstance(job_name, str):
parameters = training_config(estimator=estimator, inputs=data, job_name=job_name, mini_batch_size=mini_batch_size)
else:
parameters = training_config(estimator=estimator, inputs=data, mini_batch_size=mini_batch_size)
if estimator.debugger_hook_config != None:
parameters['DebugHookConfig'] = estimator.debugger_hook_config._to_request_dict()
if estimator.rules != None:
parameters['DebugRuleConfigurations'] = [rule.to_debugger_rule_config_dict() for rule in estimator.rules]
if isinstance(job_name, (ExecutionInput, StepInput)):
parameters['TrainingJobName'] = job_name
if hyperparameters is not None:
parameters['HyperParameters'] = hyperparameters
if experiment_config is not None:
parameters['ExperimentConfig'] = experiment_config
if 'S3Operations' in parameters:
del parameters['S3Operations']
if tags:
parameters['Tags'] = tags_dict_to_kv_list(tags)
kwargs[Field.Parameters.value] = parameters
super(TrainingStep, self).__init__(state_id, **kwargs)
def _replace_placeholders(self, params):
if not isinstance(params, dict):
return params
modified_parameters = {}
for k, v in params.items():
if isinstance(v, (ExecutionInput, StepInput)):
modified_key = "{key}.$".format(key=k)
modified_parameters[modified_key] = v.to_jsonpath()
elif isinstance(v, dict):
modified_parameters[k] = self._replace_placeholders(v)
elif isinstance(v, list):
modified_parameters[k] = [self._replace_placeholders(i) for i in v]
else:
modified_parameters[k] = v
return modified_parameters
def _validate_next_step_params(self, params, step_output):
for k, v in params.items():
if isinstance(v, StepInput):
if v is not step_output and not step_output.contains(v):
return False, k
elif isinstance(v, dict):
valid, invalid_param_name = self._validate_next_step_params(v, step_output)
if not valid:
return valid, invalid_param_name
return True, None
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
state_type (str): Type of the state. (Allowed values: `'Pass'`, `'Succeed'`, `'Fail'`, `'Wait'`, `'Task'`, `'Choice'`, `'Parallel'`, `'Map'`).
output_schema (dict): Expected output schema for the State. This is used to validate placeholder inputs used by the next state in the state machine. (default: None)
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
super(State, self).__init__(**kwargs)
self.fields['type'] = state_type
self.state_type = state_type
self.state_id = state_id
self.output_schema = output_schema
self.step_output = StepInput(schema=output_schema)
self.retries = []
self.catches = []
self.next_step = None
self.in_chain = None