Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param fatal: if False, will return False when an exception.Forbidden
occurs.
:raises nova.exception.Forbidden: if verification fails and fatal is
True.
:return: returns a non-False value (not necessarily "True") if
authorized and False if not authorized and fatal is False.
"""
if target is None:
target = {'project_id': self.project_id,
'user_id': self.user_id}
try:
return policy.authorize(self, action, target)
except exception.Forbidden:
if fatal:
raise
return False
def _check_host_is_up(self, host):
service = objects.Service.get_by_compute_host(self.context, host)
if not self.servicegroup_api.service_is_up(service):
raise exception.ComputeServiceUnavailable(host=host)
:param shared: Specifies if storage is dedicated or shared.
For shared storage disks split into partitions
"""
volume_params = []
for node in storage:
name = node.get('drive_name', None)
num_disks = node.get('num_drives', 1)
if name is None:
msg = _("drive_name not defined")
raise exception.InvalidVolumeType(reason=msg)
try:
vol_type = volume_types.get_volume_type_by_name(context, name)
except exception.NotFound:
msg = _("invalid drive type name %s")
raise exception.InvalidVolumeType(reason=msg % name)
self._check_volume_type_correctness(vol_type)
# if size field present - override disk size specified in DB
size = int(node.get('size',
vol_type['extra_specs'].get('drive_size')))
if shared:
part_size = FLAGS.vsa_part_size_gb
total_capacity = num_disks * size
num_volumes = total_capacity / part_size
size = part_size
else:
num_volumes = num_disks
def create(self, label=None, cidr=None, num_networks=None,
network_size=None, multi_host=None, vlan=None,
vlan_start=None, vpn_start=None, cidr_v6=None, gateway=None,
gateway_v6=None, bridge=None, bridge_interface=None,
dns1=None, dns2=None, project_id=None, priority=None,
uuid=None, fixed_cidr=None):
"""Creates fixed IPs for host by range."""
# NOTE(gmann): These checks are moved here as API layer does all these
# validation through JSON schema.
if not label:
raise exception.NetworkNotCreated(req="label")
if len(label) > 255:
raise exception.LabelTooLong()
if not (cidr or cidr_v6):
raise exception.NetworkNotCreated(req="cidr or cidr_v6")
kwargs = {k: v for k, v in locals().items()
if v and k != "self"}
if multi_host is not None:
kwargs['multi_host'] = multi_host == 'T'
net_manager = importutils.import_object(CONF.network_manager)
net_manager.create_networks(context.get_admin_context(), **kwargs)
self.address = node['pm_address']
self.user = node['pm_user']
self.password = node['pm_password']
self.port = node['terminal_port']
if self.node_id == None:
raise exception.InvalidParameterValue(_("Node id not supplied "
"to IPMI"))
if self.address == None:
raise exception.InvalidParameterValue(_("Address not supplied "
"to IPMI"))
if self.user == None:
raise exception.InvalidParameterValue(_("User not supplied "
"to IPMI"))
if self.password == None:
raise exception.InvalidParameterValue(_("Password not supplied "
"to IPMI"))
def _volume_driver_method(self, method_name, connection_info,
*args, **kwargs):
driver_type = connection_info.get('driver_volume_type')
if driver_type not in self.volume_drivers:
raise exception.VolumeDriverNotFound(driver_type=driver_type)
driver = self.volume_drivers[driver_type]
method = getattr(driver, method_name)
return method(connection_info, *args, **kwargs)
policies = set(('anti-affinity', 'affinity', 'soft-affinity',
'soft-anti-affinity'))
if group.policy in policies:
if not _SUPPORTS_AFFINITY and 'affinity' == group.policy:
msg = _("ServerGroupAffinityFilter not configured")
LOG.error(msg)
raise exception.UnsupportedPolicyException(reason=msg)
if not _SUPPORTS_ANTI_AFFINITY and 'anti-affinity' == group.policy:
msg = _("ServerGroupAntiAffinityFilter not configured")
LOG.error(msg)
raise exception.UnsupportedPolicyException(reason=msg)
if (not _SUPPORTS_SOFT_AFFINITY and 'soft-affinity' == group.policy):
msg = _("ServerGroupSoftAffinityWeigher not configured")
LOG.error(msg)
raise exception.UnsupportedPolicyException(reason=msg)
if (not _SUPPORTS_SOFT_ANTI_AFFINITY and
'soft-anti-affinity' == group.policy):
msg = _("ServerGroupSoftAntiAffinityWeigher not configured")
LOG.error(msg)
raise exception.UnsupportedPolicyException(reason=msg)
group_hosts = set(group.get_hosts())
user_hosts = set(user_group_hosts) if user_group_hosts else set()
return GroupDetails(hosts=user_hosts | group_hosts,
policy=group.policy, members=group.members)
availability_zone = vol.get('availability_zone', None)
try:
new_volume = self.volume_api.create(
context,
size,
vol.get('display_name'),
vol.get('display_description'),
snapshot=snapshot,
volume_type=vol_type,
metadata=metadata,
availability_zone=availability_zone
)
except exception.InvalidInput as err:
raise exc.HTTPBadRequest(explanation=err.format_message())
except exception.OverQuota as err:
raise exc.HTTPForbidden(explanation=err.format_message())
# TODO(vish): Instance should be None at db layer instead of
# trying to lazy load, but for now we turn it into
# a dict to avoid an error.
retval = _translate_volume_detail_view(context, dict(new_volume))
result = {'volume': retval}
location = '%s/%s' % (req.url, new_volume['id'])
return wsgi.ResponseObject(result, headers=dict(location=location))
def address_to_hosts(addresses):
"""Iterate over hosts within an address range.
If an explicit range specifier is missing, the parameter is
interpreted as a specific individual address.
"""
try:
return [netaddr.IPAddress(addresses)]
except ValueError:
net = netaddr.IPNetwork(addresses)
if net.size < 4:
reason = _("/%s should be specified as single address(es) "
"not in cidr format") % net.prefixlen
raise exception.InvalidInput(reason=reason)
elif net.size >= 1000000:
# NOTE(dripton): If we generate a million IPs and put them in
# the database, the system will slow to a crawl and/or run
# out of memory and crash. This is clearly a misconfiguration.
reason = _("Too many IP addresses will be generated. Please "
"increase /%s to reduce the number generated."
) % net.prefixlen
raise exception.InvalidInput(reason=reason)
else:
return net.iter_hosts()
def _get_vm_serial_port_mapping(self):
serial_port_conns = self._vmutils.get_vm_serial_port_connections(
self._instance_name)
if not serial_port_conns:
err_msg = _("No suitable serial port pipe was found "
"for instance %(instance_name)s")
raise exception.NovaException(
err_msg % {'instance_name': self._instance_name})
serial_port_mapping = {}
# At the moment, we tag the pipes by using a pipe path suffix
# as we can't use the serial port ElementName attribute because of
# a Hyper-V bug.
for pipe_path in serial_port_conns:
# expected pipe_path:
# '\\.\pipe\fc1bcc91-c7d3-4116-a210-0cd151e019cd_rw'
port_type = pipe_path[-2:]
if port_type in [constants.SERIAL_PORT_TYPE_RO,
constants.SERIAL_PORT_TYPE_RW]:
serial_port_mapping[port_type] = pipe_path
else:
serial_port_mapping[constants.SERIAL_PORT_TYPE_RW] = pipe_path