Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
properties.Schema.STRING,
_('Description of the pool.'),
update_allowed=True
),
ADMIN_STATE_UP: properties.Schema(
properties.Schema.BOOLEAN,
_('The administrative state of this pool.'),
default=True,
update_allowed=True
),
PROVIDER: properties.Schema(
properties.Schema.STRING,
_('LBaaS provider to implement this load balancer instance.'),
support_status=support.SupportStatus(version='5.0.0'),
constraints=[
constraints.CustomConstraint('neutron.lb.provider')
],
),
VIP: properties.Schema(
properties.Schema.MAP,
_('IP address and port of the pool.'),
schema={
VIP_NAME: properties.Schema(
properties.Schema.STRING,
_('Name of the vip.')
),
VIP_DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Description of the vip.')
),
VIP_SUBNET: properties.Schema(
properties.Schema.STRING,
],
default=None,
update_allowed=True
),
KEY_SIZE: properties.Schema(
properties.Schema.INTEGER,
_('Size of encryption key, in bits. '
'For example, 128 or 256.'),
default=None,
update_allowed=True
),
VOLUME_TYPE: properties.Schema(
properties.Schema.STRING,
_('Name or id of volume type (OS::Cinder::VolumeType).'),
required=True,
constraints=[constraints.CustomConstraint('cinder.vtype')]
),
}
def _get_vol_type_id(self, volume_type):
id = self.client_plugin().get_volume_type(volume_type)
return id
def handle_create(self):
body = {
'provider': self.properties[self.PROVIDER],
'cipher': self.properties[self.CIPHER],
'key_size': self.properties[self.KEY_SIZE],
'control_location': self.properties[self.CONTROL_LOCATION]
}
vol_type_id = self._get_vol_type_id(self.properties[self.VOLUME_TYPE])
DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Description for the firewall rule.'),
update_allowed=True
),
SHARED: properties.Schema(
properties.Schema.BOOLEAN,
_('Whether this rule should be shared across all tenants.'),
default=False,
update_allowed=True
),
PROTOCOL: properties.Schema(
properties.Schema.STRING,
_('Protocol for the firewall rule.'),
constraints=[
constraints.AllowedValues(['tcp', 'udp', 'icmp', 'any']),
],
default='any',
update_allowed=True,
),
IP_VERSION: properties.Schema(
properties.Schema.STRING,
_('Internet protocol version.'),
default='4',
constraints=[
constraints.AllowedValues(['4', '6']),
],
update_allowed=True
),
SOURCE_IP_ADDRESS: properties.Schema(
properties.Schema.STRING,
_('Source IP address or CIDR.'),
properties_schema = {
NAME: properties.Schema(
properties.Schema.STRING,
_('Name for the Port Pair.'),
update_allowed=True
),
DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Description for the Port Pair.'),
update_allowed=True
),
INGRESS: properties.Schema(
properties.Schema.STRING,
_('ID or name of the ingress neutron port.'),
constraints=[constraints.CustomConstraint('neutron.port')],
required=True,
),
EGRESS: properties.Schema(
properties.Schema.STRING,
_('ID or name of the egress neutron port.'),
constraints=[constraints.CustomConstraint('neutron.port')],
required=True,
),
SERVICE_FUNCTION_PARAMETERS: properties.Schema(
properties.Schema.MAP,
_('Dictionary of service function parameter. Currently '
'only correlation=None is supported.'),
default={'correlation': None},
),
}
'name', 'domain', 'description', 'enabled', 'email', 'password',
'default_project', 'groups'
)
properties_schema = {
NAME: properties.Schema(
properties.Schema.STRING,
_('Name of keystone user.'),
update_allowed=True
),
DOMAIN: properties.Schema(
properties.Schema.STRING,
_('Name or ID of keystone domain.'),
default='default',
update_allowed=True,
constraints=[constraints.CustomConstraint('keystone.domain')]
),
DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Description of keystone user.'),
default='',
update_allowed=True
),
ENABLED: properties.Schema(
properties.Schema.BOOLEAN,
_('Keystone user is enabled or disabled.'),
default=True,
update_allowed=True
),
EMAIL: properties.Schema(
properties.Schema.STRING,
_('Email address of keystone user.'),
) = (
'Character_set', 'Collate', 'Name',
)
_USER_KEYS = (
USER_NAME, USER_PASSWORD, USER_HOST, USER_DATABASES,
) = (
'Name', 'Password', 'Host', 'Databases',
)
properties_schema = {
INSTANCE_NAME: properties.Schema(
properties.Schema.STRING,
required=True,
constraints=[
constraints.Length(max=255),
]
),
FLAVOR_REF: properties.Schema(
properties.Schema.STRING,
required=True
),
VOLUME_SIZE: properties.Schema(
properties.Schema.NUMBER,
required=True,
constraints=[
constraints.Range(1, 150),
]
),
DATABASES: properties.Schema(
properties.Schema.LIST,
schema=properties.Schema(
constraints=[
constraints.CustomConstraint('keystone.project')
]
),
SUBNET: properties.Schema(
properties.Schema.INTEGER,
_('Quota for the number of subnets. '
'Setting -1 means unlimited.'),
constraints=[constraints.Range(min=-1)],
update_allowed=True
),
NETWORK: properties.Schema(
properties.Schema.INTEGER,
_('Quota for the number of networks. '
'Setting -1 means unlimited.'),
constraints=[constraints.Range(min=-1)],
update_allowed=True
),
FLOATINGIP: properties.Schema(
properties.Schema.INTEGER,
_('Quota for the number of floating IPs. '
'Setting -1 means unlimited.'),
constraints=[constraints.Range(min=-1)],
update_allowed=True
),
SECURITY_GROUP_RULE: properties.Schema(
properties.Schema.INTEGER,
_('Quota for the number of security group rules. '
'Setting -1 means unlimited.'),
constraints=[constraints.Range(min=-1)],
update_allowed=True
),
'subnet'
)
ATTRIBUTES = (
ADDRESS_ATTR, POOL_ID_ATTR
) = (
'address', 'pool_id'
)
properties_schema = {
POOL: properties.Schema(
properties.Schema.STRING,
_('Name or ID of the load balancing pool.'),
required=True,
constraints=[
constraints.CustomConstraint('neutron.lbaas.pool')
]
),
ADDRESS: properties.Schema(
properties.Schema.STRING,
_('IP address of the pool member on the pool network.'),
required=True,
constraints=[
constraints.CustomConstraint('ip_addr')
]
),
PROTOCOL_PORT: properties.Schema(
properties.Schema.INTEGER,
_('Port on which the pool member listens for requests or '
'connections.'),
required=True,
constraints=[
)
properties_schema = {
# Based on RFC 1035, length of name is set to max of 255
NAME: properties.Schema(
properties.Schema.STRING,
_('Domain name.'),
required=True,
constraints=[constraints.Length(max=255)]
),
# Based on RFC 1035, range for ttl is set to 1 to signed 32 bit number
TTL: properties.Schema(
properties.Schema.INTEGER,
_('Time To Live (Seconds).'),
update_allowed=True,
constraints=[constraints.Range(min=1,
max=2147483647)]
),
# designate mandates to the max length of 160 for description
DESCRIPTION: properties.Schema(
properties.Schema.STRING,
_('Description of domain.'),
update_allowed=True,
constraints=[constraints.Length(max=160)]
),
EMAIL: properties.Schema(
properties.Schema.STRING,
_('Domain email.'),
update_allowed=True,
required=True
)
}
]
),
}
)
properties_schema.update(
{
server.Server.SOFTWARE_CONFIG_TRANSPORT: properties.Schema(
properties.Schema.STRING,
_('How the server should receive the metadata required for '
'software configuration. POLL_TEMP_URL is the only '
'supported transport on Rackspace Cloud. This property is '
'retained for compatibility.'),
default=server.Server.POLL_TEMP_URL,
update_allowed=True,
constraints=[
constraints.AllowedValues([
server.Server.POLL_TEMP_URL
])
]
),
}
)
def __init__(self, name, json_snippet, stack):
super(CloudServer, self).__init__(name, json_snippet, stack)
self._managed_cloud_started_event_sent = False
self._rack_connect_started_event_sent = False
def _config_drive(self):
user_data_format = self.properties[self.USER_DATA_FORMAT]
is_sw_config = user_data_format == self.SOFTWARE_CONFIG
user_data = self.properties.get(self.USER_DATA)