Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wrapper(self, *a, **k):
try:
return f(self, *a, **k)
except ks_exc.EndpointNotFound:
warn_limit(
self, 'The placement API endpoint was not found.')
# Reset client session so there is a new catalog, which
# gets cached when keystone is first successfully contacted.
self._client = self._create_client()
except ks_exc.MissingAuthPlugin:
warn_limit(
self, 'No authentication information found for placement API.')
except ks_exc.Unauthorized:
warn_limit(
self, 'Placement service credentials do not work.')
except ks_exc.DiscoveryFailure:
# TODO(_gryf): Looks like DiscoveryFailure is not the only missing
# exception here. In Pike we should take care about keystoneauth1
# failures handling globally.
warn_limit(self,
'Discovering suitable URL for placement API failed.')
except ks_exc.ConnectFailure:
LOG.warning('Placement API service is not responding.')
return wrapper
# The version we started with is correct, and we don't want
# new data
return
if vers_url in tried:
continue
tried.add(vers_url)
try:
self._disc = get_discovery(
session, vers_url,
cache=cache,
authenticated=False)
break
except (exceptions.DiscoveryFailure,
exceptions.HttpError,
exceptions.ConnectionError):
continue
if not self._disc:
# We couldn't find a version discovery document anywhere.
if self._catalog_matches_version:
# But - the version in the catalog is fine.
self.service_url = self.catalog_url
return
# NOTE(jamielennox): The logic here is required for backwards
# compatibility. By itself it is not ideal.
if allow_version_hack:
# NOTE(jamielennox): If we can't contact the server we
# fall back to just returning the URL from the catalog. This
# is backwards compatible behaviour and used when there is no
# other choice. Realistically if you have provided a version
:param user_token: user's token id
:param retry: flag that forces the middleware to retry
user authentication when an indeterminate
response is received. Optional.
:param allow_expired: Allow retrieving an expired token.
:returns: access info received from identity server on success
:rtype: :py:class:`keystoneauth1.access.AccessInfo`
:raises exc.InvalidToken: if token is rejected
:raises exc.ServiceError: if unable to authenticate token
"""
try:
auth_ref = self._request_strategy.verify_token(
user_token,
allow_expired=allow_expired)
except ksa_exceptions.NotFound as e:
self._LOG.info('Authorization failed for token')
self._LOG.info('Identity response: %s', e.response.text)
raise ksm_exceptions.InvalidToken(_('Token authorization failed'))
except ksa_exceptions.Unauthorized as e:
self._LOG.info('Identity server rejected authorization')
self._LOG.warning('Identity response: %s', e.response.text)
if retry:
self._LOG.info('Retrying validation')
return self.verify_token(user_token, False)
msg = _('Identity server rejected authorization necessary to '
'fetch token data')
raise ksm_exceptions.ServiceError(msg)
except ksa_exceptions.HttpError as e:
self._LOG.error(
'Bad response code while validating token: %s %s',
e.http_status, e.message)
def operator_cloud(
config=None, strict=False, app_name=None, app_version=None, **kwargs):
if not config:
config = _get_openstack_config(app_name, app_version)
try:
cloud_config = config.get_one_cloud(**kwargs)
except keystoneauth1.exceptions.auth_plugins.NoMatchingPlugin as e:
raise OpenStackCloudException(
"Invalid cloud configuration: {exc}".format(exc=str(e)))
return OperatorCloud(cloud_config=cloud_config, strict=strict)
def take_action(self, parsed_args):
identity_client = self.app.client_manager.identity
info = {}
try:
project = utils.find_resource(
identity_client.tenants,
parsed_args.project,
)
info.update(project._info)
except ks_exc.Forbidden:
auth_ref = self.app.client_manager.auth_ref
if (
parsed_args.project == auth_ref.project_id or
parsed_args.project == auth_ref.project_name
):
# Ask for currently auth'ed project so return it
info = {
'id': auth_ref.project_id,
'name': auth_ref.project_name,
# True because we don't get this far if it is disabled
'enabled': True,
}
else:
raise
# TODO(stevemar): Remove the line below when we support multitenancy
def get_sp_url(self, session, sp_id, **kwargs):
try:
return self.get_access(
session).service_providers.get_sp_url(sp_id)
except exceptions.ServiceProviderNotFound:
return None
value=None,
attr=None,
):
"""Find a single resource by name or ID
:param string path:
The API-specific portion of the URL path
:param string value:
search expression
:param string attr:
name of attribute for secondary search
"""
try:
ret = self._request('GET', "/%s/%s" % (path, value)).json()
except ks_exceptions.NotFound:
kwargs = {attr: value}
try:
ret = self.find_one("/%s/detail" % (path), **kwargs)
except ks_exceptions.NotFound:
msg = _("%s not found") % value
raise exceptions.NotFound(msg)
return ret
def _get_service_provider(self, sp_id):
try:
return self._service_providers[sp_id]
except KeyError:
raise exceptions.ServiceProviderNotFound(sp_id)
def map_exceptions_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except _exceptions.HttpError as e:
raise exceptions.from_exception(e)
except _exceptions.ClientException as e:
raise exceptions.SDKException(message=e.message, cause=e)
MIN_PLACEMENT_MICROVERSION)
if max_version < needs_version:
msg = (_('Placement API version %(needed)s needed, '
'you have %(current)s.') %
{'needed': needs_version, 'current': max_version})
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
except ks_exc.MissingAuthPlugin:
msg = _('No credentials specified for placement API in nova.conf.')
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
except ks_exc.Unauthorized:
msg = _('Placement service credentials do not work.')
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
except ks_exc.EndpointNotFound:
msg = _('Placement API endpoint not found.')
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
except ks_exc.DiscoveryFailure:
msg = _('Discovery for placement API URI failed.')
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
except ks_exc.NotFound:
msg = _('Placement API does not seem to be running.')
return upgradecheck.Result(upgradecheck.Code.FAILURE, msg)
return upgradecheck.Result(upgradecheck.Code.SUCCESS)