Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
}
if s[rpc_api.STACK_UPDATED_TIME] is not None:
keymap[rpc_api.STACK_UPDATED_TIME] = 'LastUpdatedTime'
result = api_utils.reformat_dict_keys(keymap, s)
action = s[rpc_api.STACK_ACTION]
status = s[rpc_api.STACK_STATUS]
result['StackStatus'] = '_'.join((action, status))
# Reformat outputs, these are handled separately as they are
# only present in the engine output for a completely created
# stack
result['Outputs'] = []
if rpc_api.STACK_OUTPUTS in s:
for o in s[rpc_api.STACK_OUTPUTS]:
result['Outputs'].append(format_stack_outputs(o))
# Reformat Parameters dict-of-dict into AWS API format
# This is a list-of-dict with nasty "ParameterKey" : key
# "ParameterValue" : value format.
result['Parameters'] = [{'ParameterKey': k,
'ParameterValue': v}
for (k, v) in result['Parameters'].items()]
return self._id_format(result)
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from heat.api.openstack.v1 import util
from heat.api.openstack.v1.views import views_common
from heat.rpc import api as rpc_api
_collection_name = 'stacks'
basic_keys = (
rpc_api.STACK_ID,
rpc_api.STACK_NAME,
rpc_api.STACK_DESCRIPTION,
rpc_api.STACK_STATUS,
rpc_api.STACK_STATUS_DATA,
rpc_api.STACK_CREATION_TIME,
rpc_api.STACK_DELETION_TIME,
rpc_api.STACK_UPDATED_TIME,
rpc_api.STACK_OWNER,
rpc_api.STACK_PARENT,
rpc_api.STACK_USER_PROJECT_ID,
rpc_api.STACK_TAGS,
)
def format_stack(req, stack, keys=None, include_project=False):
def transform(key, value):
def prepare_args(self, data, is_update=False):
args = data.args()
key = rpc_api.PARAM_TIMEOUT
if key in args:
args[key] = self._extract_int_param(key, args[key])
key = rpc_api.PARAM_TAGS
if args.get(key) is not None:
args[key] = self._extract_tags_param(args[key])
key = rpc_api.PARAM_CONVERGE
if not is_update and key in args:
msg = _("%s flag only supported in stack update (or update "
"preview) request.") % key
raise exc.HTTPBadRequest(str(msg))
return args
elif isinstance(c, constr.Range):
if c.min is not None:
res[rpc_api.PARAM_MIN_VALUE] = c.min
if c.max is not None:
res[rpc_api.PARAM_MAX_VALUE] = c.max
elif isinstance(c, constr.Modulo):
if c.step is not None:
res[rpc_api.PARAM_STEP] = c.step
if c.offset is not None:
res[rpc_api.PARAM_OFFSET] = c.offset
elif isinstance(c, constr.AllowedValues):
res[rpc_api.PARAM_ALLOWED_VALUES] = list(c.allowed)
elif isinstance(c, constr.AllowedPattern):
res[rpc_api.PARAM_ALLOWED_PATTERN] = c.pattern
elif isinstance(c, constr.CustomConstraint):
res[rpc_api.PARAM_CUSTOM_CONSTRAINT] = c.name
if c.description:
constraint_description.append(c.description)
if constraint_description:
res[rpc_api.PARAM_CONSTRAINT_DESCRIPTION] = " ".join(
constraint_description)
derived_inputs = self._build_derived_inputs(action, source)
derived_options = self._build_derived_options(action, source)
derived_config = self._build_derived_config(
action, source, derived_inputs, derived_options)
derived_name = (self.properties.get(self.NAME) or
source.get(rpc_api.SOFTWARE_CONFIG_NAME))
return {
rpc_api.SOFTWARE_CONFIG_GROUP:
source.get(rpc_api.SOFTWARE_CONFIG_GROUP) or 'Heat::Ungrouped',
rpc_api.SOFTWARE_CONFIG_CONFIG:
derived_config or self.empty_config(),
rpc_api.SOFTWARE_CONFIG_OPTIONS: derived_options,
rpc_api.SOFTWARE_CONFIG_INPUTS:
[i.as_dict() for i in derived_inputs],
rpc_api.SOFTWARE_CONFIG_OUTPUTS:
[o.as_dict() for o in source[rpc_api.SOFTWARE_CONFIG_OUTPUTS]],
rpc_api.SOFTWARE_CONFIG_NAME:
derived_name or self.physical_resource_name()
}
return str(self.name)
if STACK_ID_OUTPUT in self.attributes.cached_attrs:
return self.attributes.cached_attrs[STACK_ID_OUTPUT]
stack_identity = self.nested_identifier()
reference_id = stack_identity.arn()
try:
if self._outputs is not None:
reference_id = self.get_output(STACK_ID_OUTPUT)
elif STACK_ID_OUTPUT in self.attributes:
output = self.rpc_client().show_output(self.context,
dict(stack_identity),
STACK_ID_OUTPUT)
if rpc_api.OUTPUT_ERROR in output:
raise exception.TemplateOutputError(
resource=self.name,
attribute=STACK_ID_OUTPUT,
message=output[rpc_api.OUTPUT_ERROR])
reference_id = output[rpc_api.OUTPUT_VALUE]
except exception.TemplateOutputError as err:
LOG.info('%s', err)
except exception.NotFound:
pass
self.attributes.set_cached_attr(STACK_ID_OUTPUT, reference_id)
return reference_id
stack_keys.update(filter_param_types)
filter_params = util.get_allowed_params(req.params, stack_keys)
show_deleted = False
p_name = rpc_api.PARAM_SHOW_DELETED
if p_name in params:
params[p_name] = self._extract_bool_param(p_name, params[p_name])
show_deleted = params[p_name]
show_nested = False
p_name = rpc_api.PARAM_SHOW_NESTED
if p_name in params:
params[p_name] = self._extract_bool_param(p_name, params[p_name])
show_nested = params[p_name]
key = rpc_api.PARAM_LIMIT
if key in params:
params[key] = self._extract_int_param(key, params[key])
show_hidden = False
p_name = rpc_api.PARAM_SHOW_HIDDEN
if p_name in params:
params[p_name] = self._extract_bool_param(p_name, params[p_name])
show_hidden = params[p_name]
tags = None
if rpc_api.PARAM_TAGS in params:
params[rpc_api.PARAM_TAGS] = self._extract_tags_param(
params[rpc_api.PARAM_TAGS])
tags = params[rpc_api.PARAM_TAGS]
tags_any = None
if (schema.implemented
and schema.support_status.status != support.HIDDEN):
yield name, dict(schema)
def attributes_schema():
for name, schema_data in itertools.chain(
resource_class.attributes_schema.items(),
resource_class.base_attributes_schema.items()):
schema = attributes.Schema.from_attribute(schema_data)
if schema.support_status.status != support.HIDDEN:
yield name, dict(schema)
result = {
rpc_api.RES_SCHEMA_RES_TYPE: type_name,
rpc_api.RES_SCHEMA_PROPERTIES: dict(properties_schema()),
rpc_api.RES_SCHEMA_ATTRIBUTES: dict(attributes_schema()),
rpc_api.RES_SCHEMA_SUPPORT_STATUS:
resource_class.support_status.to_dict()
}
if with_description:
result[rpc_api.RES_SCHEMA_DESCRIPTION] = resource_class.getdoc()
return result
p_name = rpc_api.PARAM_SHOW_HIDDEN
if p_name in params:
params[p_name] = self._extract_bool_param(p_name, params[p_name])
show_hidden = params[p_name]
tags = None
if rpc_api.PARAM_TAGS in params:
params[rpc_api.PARAM_TAGS] = self._extract_tags_param(
params[rpc_api.PARAM_TAGS])
tags = params[rpc_api.PARAM_TAGS]
tags_any = None
if rpc_api.PARAM_TAGS_ANY in params:
params[rpc_api.PARAM_TAGS_ANY] = self._extract_tags_param(
params[rpc_api.PARAM_TAGS_ANY])
tags_any = params[rpc_api.PARAM_TAGS_ANY]
not_tags = None
if rpc_api.PARAM_NOT_TAGS in params:
params[rpc_api.PARAM_NOT_TAGS] = self._extract_tags_param(
params[rpc_api.PARAM_NOT_TAGS])
not_tags = params[rpc_api.PARAM_NOT_TAGS]
not_tags_any = None
if rpc_api.PARAM_NOT_TAGS_ANY in params:
params[rpc_api.PARAM_NOT_TAGS_ANY] = self._extract_tags_param(
params[rpc_api.PARAM_NOT_TAGS_ANY])
not_tags_any = params[rpc_api.PARAM_NOT_TAGS_ANY]
# get the with_count value, if invalid, raise ValueError
with_count = False
if req.params.get('with_count'):
def handle_create(self):
cloud_config = template_format.yaml.dump(
self.properties[self.CLOUD_CONFIG],
Dumper=template_format.yaml_dumper)
props = {
rpc_api.SOFTWARE_CONFIG_NAME: self.physical_resource_name(),
rpc_api.SOFTWARE_CONFIG_CONFIG: '#cloud-config\n%s' % cloud_config,
rpc_api.SOFTWARE_CONFIG_GROUP: 'Heat::Ungrouped'
}
sc = self.rpc_client().create_software_config(self.context, **props)
self.resource_id_set(sc[rpc_api.SOFTWARE_CONFIG_ID])