Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_endpoint(self):
auth_plugin = authv1.PasswordPlugin(**self.options)
object_store_endpoint = auth_plugin.get_endpoint(
self.mock_session, service_type='object-store')
self.assertEqual(object_store_endpoint, self.expected_endpoint)
auth_endpoint = auth_plugin.get_endpoint(
self.mock_session, interface=plugin.AUTH_INTERFACE)
self.assertEqual(auth_endpoint, self.options['auth_url'])
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(self.mock_session)
self.assertEqual('public endpoint for None service not found',
str(exc_mgr.exception))
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(
self.mock_session, service_type='identity', region_name='DFW')
self.assertEqual(
'public endpoint for identity service in DFW region not found',
str(exc_mgr.exception))
with self.assertRaises(exceptions.EndpointNotFound) as exc_mgr:
auth_plugin.get_endpoint(
self.mock_session, service_type='image', service_name='glance')
self.assertEqual(
'public endpoint for image service named glance not found',
self.log.critical('Unable to validate token: %s', e)
if self._delay_auth_decision:
self.log.debug('Keystone unavailable; marking token as '
'invalid and deferring auth decision.')
raise ksm_exceptions.InvalidToken(
'Keystone unavailable: %s' % e)
raise webob.exc.HTTPServiceUnavailable(
'The Keystone service is temporarily unavailable.')
except ksm_exceptions.InvalidToken:
self.log.debug('Token validation failure.', exc_info=True)
if token_hashes:
self._token_cache.set(token_hashes[0],
_CACHE_INVALID_INDICATOR)
self.log.warning('Authorization failed for token')
raise
except ksa_exceptions.EndpointNotFound:
# Invalidate auth in adapter for identity endpoint update
self._identity_server.invalidate()
raise
return data
max_version = pkg_resources.parse_version(
versions["versions"][0]["max_version"])
needs_version = pkg_resources.parse_version(
MIN_PLACEMENT_MICROVERSION)
if max_version < needs_version:
msg = (_('Placement API version %(needed)s needed, '
'you have %(current)s.') %
{'needed': needs_version, 'current': max_version})
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
except ks_exc.MissingAuthPlugin:
msg = _('No credentials specified for placement API in nova.conf.')
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
except ks_exc.Unauthorized:
msg = _('Placement service credentials do not work.')
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
except ks_exc.EndpointNotFound:
msg = _('Placement API endpoint not found.')
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
except ks_exc.DiscoveryFailure:
msg = _('Discovery for placement API URI failed.')
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
except ks_exc.NotFound:
msg = _('Placement API does not seem to be running.')
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
return upgradecheck.Result(upgradecheck.Code.SUCCESS)
'named %(service_name)s not found' %
{'interface': interface,
'service_type': service_type,
'service_name': service_name})
elif region_name:
msg = ('%(interface)s endpoint for %(service_type)s service '
'in %(region_name)s region not found' %
{'interface': interface,
'service_type': service_type, 'region_name': region_name})
else:
msg = ('%(interface)s endpoint for %(service_type)s service '
'not found' %
{'interface': interface,
'service_type': service_type})
raise exceptions.EndpointNotFound(msg)
the instance is generated and returned.
This, depending on the configuration, might
mean a call to Keystone. If false, None
value is returned in the dict at the
image_ref_url key.
"""
image_ref_url = None
if populate_image_ref_url:
try:
# NOTE(mriedem): We can eventually drop this when we no longer
# support legacy notifications since versioned notifications don't
# use this.
image_ref_url = image_api.API().generate_image_url(
instance.image_ref, context)
except ks_exc.EndpointNotFound:
# We might be running from a periodic task with no auth token and
# CONF.glance.api_servers isn't set, so we can't get the image API
# endpoint URL from the service catalog, therefore just use the
# image id for the URL (yes it's a lie, but it's best effort at
# this point).
with excutils.save_and_reraise_exception() as exc_ctx:
if context.auth_token is None:
image_ref_url = instance.image_ref
exc_ctx.reraise = False
instance_type = instance.get_flavor()
instance_type_name = instance_type.get('name', '')
instance_flavorid = instance_type.get('flavorid', '')
instance_info = dict(
# Owner properties
def check_cinder_exists():
service_type = 'volumev2'
try:
base.url_for(context.current().service_catalog, service_type,
endpoint_type=CONF.cinder.endpoint_type)
return True
except keystone_exceptions.EndpointNotFound:
return False
try:
return super(Command, self).run(parsed_args)
except exceptions.RemoteError as e:
columns = ['Code', 'Type']
values = [e.code, e.type]
if e.message:
columns.append('Message')
values.append(e.message)
if e.errors:
columns.append('Errors')
values.append(e.errors)
self.error_output(parsed_args, columns, values)
except ks_exceptions.EndpointNotFound as e:
self.app.log.error('No endpoint was found. You must provide a '
'username or user id via --os-username, '
'--os-user-id, env[OS_USERNAME] or '
'env[OS_USER_ID]. You may also be using a '
'cloud that does not have the V1 API enabled. '
'If your cloud does not have the V1 DNS API '
'use the openstack CLI to interact with the '
'DNS Service.')
return 1
def _get_endpoint(conf, ksclient):
# we store the endpoint as a base class attribute, so keystone is
# only ever called once
if _Base._ENDPOINT is None:
try:
creds = conf.service_credentials
_Base._ENDPOINT = keystone_client.get_service_catalog(
ksclient).url_for(
service_type=conf.service_types.swift,
interface=creds.interface,
region_name=creds.region_name)
except exceptions.EndpointNotFound as e:
LOG.info("Swift endpoint not found: %s", e)
return _Base._ENDPOINT
# because its get_endpoint() method isn't yet set up to handle interface as
# a list. (It could also happen with a real auth if the endpoint isn't
# there; but that's covered below.)
try:
return ksa_adapter.get_endpoint()
except ks_exc.EndpointNotFound:
pass
interfaces = list(ksa_adapter.interface)
for interface in interfaces:
ksa_adapter.interface = interface
try:
return ksa_adapter.get_endpoint()
except ks_exc.EndpointNotFound:
pass
raise ks_exc.EndpointNotFound(
"Could not find requested endpoint for any of the following "
"interfaces: %s" % interfaces)
# if we are passed a fully qualified URL and an endpoint_filter we
# should ignore the filter. This will make it easier for clients who
# want to overrule the default endpoint_filter data added to all client
# requests. We check fully qualified here by the presence of a host.
if not urllib.parse.urlparse(url).netloc:
base_url = None
if endpoint_override:
base_url = endpoint_override % _StringFormatter(self, auth)
elif endpoint_filter:
base_url = self.get_endpoint(auth, allow=allow,
**endpoint_filter)
if not base_url:
raise exceptions.EndpointNotFound()
url = '%s/%s' % (base_url.rstrip('/'), url.lstrip('/'))
if self.cert:
kwargs.setdefault('cert', self.cert)
if self.timeout is not None:
kwargs.setdefault('timeout', self.timeout)
if user_agent:
headers['User-Agent'] = user_agent
elif self.user_agent:
user_agent = headers.setdefault('User-Agent', self.user_agent)
else:
# Per RFC 7231 Section 5.5.3, identifiers in a user-agent should be
# ordered by decreasing significance. If a user sets their product