Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_auth_plugin(self):
# NOTE(jamielennox): Ideally this would use load_from_conf_options
# however that is not possible because we have to support the override
# pattern we use in _conf.get. This function therefore does a manual
# version of load_from_conf_options with the fallback plugin inline.
group = self._conf.get('auth_section') or _base.AUTHTOKEN_GROUP
# NOTE(jamielennox): auth_plugin was deprecated to auth_type. _conf.get
# doesn't handle that deprecation in the case of conf dict options so
# we have to manually check the value
plugin_name = (self._conf.get('auth_type', group=group)
or self._conf.paste_overrides.get('auth_plugin'))
if not plugin_name:
return _auth.AuthTokenPlugin(
log=self.log,
auth_admin_prefix=self._conf.get('auth_admin_prefix',
group=group),
auth_host=self._conf.get('auth_host', group=group),
auth_port=self._conf.get('auth_port', group=group),
auth_protocol=self._conf.get('auth_protocol', group=group),
identity_uri=self._conf.get('identity_uri', group=group),
admin_token=self._conf.get('admin_token', group=group),
admin_user=self._conf.get('admin_user', group=group),
admin_password=self._conf.get('admin_password', group=group),
admin_tenant_name=self._conf.get('admin_tenant_name',
group=group)
)
# Plugin option registration is normally done as part of the load_from
# function rather than the register function so copy here.
def _get_auth_plugin(self):
# NOTE(jamielennox): Ideally this would use load_from_conf_options
# however that is not possible because we have to support the override
# pattern we use in _conf_get. This function therefore does a manual
# version of load_from_conf_options with the fallback plugin inline.
group = self._conf_get('auth_section') or _base.AUTHTOKEN_GROUP
# NOTE(jamielennox): auth_plugin was deprecated to auth_type. _conf_get
# doesn't handle that deprecation in the case of conf dict options so
# we have to manually check the value
plugin_name = (self._conf_get('auth_type', group=group)
or self._conf.get('auth_plugin'))
if not plugin_name:
return _auth.AuthTokenPlugin(
log=self.log,
auth_admin_prefix=self._conf_get('auth_admin_prefix',
group=group),
auth_host=self._conf_get('auth_host', group=group),
auth_port=self._conf_get('auth_port', group=group),
auth_protocol=self._conf_get('auth_protocol', group=group),
identity_uri=self._conf_get('identity_uri', group=group),
admin_token=self._conf_get('admin_token', group=group),
admin_user=self._conf_get('admin_user', group=group),
admin_password=self._conf_get('admin_password', group=group),
admin_tenant_name=self._conf_get('admin_tenant_name',
group=group)
)
# Plugin option registration is normally done as part of the load_from
# function rather than the register function so copy here.
def www_authenticate_uri(self):
www_authenticate_uri = self._adapter.get_endpoint(
interface=plugin.AUTH_INTERFACE)
# NOTE(jamielennox): This weird stripping of the prefix hack is
# only relevant to the legacy case. We urljoin '/' to get just the
# base URI as this is the original behaviour.
if isinstance(self._adapter.auth, _auth.AuthTokenPlugin):
www_authenticate_uri = urllib.parse.urljoin(
www_authenticate_uri, '/').rstrip('/')
return www_authenticate_uri
' binding method that must be present in tokens.'),
cfg.BoolOpt('check_revocations_for_cached', default=False,
help='If true, the revocation list will be checked for cached'
' tokens. This requires that PKI tokens are configured on the'
' identity server.'),
cfg.ListOpt('hash_algorithms', default=['md5'],
help='Hash algorithms to use for hashing PKI tokens. This may'
' be a single algorithm or multiple. The algorithms are those'
' supported by Python standard hashlib.new(). The hashes will'
' be tried in the order given, so put the preferred one first'
' for performance. The result of the first hash will be stored'
' in the cache. This will typically be set to multiple values'
' only while migrating from a less secure algorithm to a more'
' secure one. Once all the old tokens are expired this option'
' should be set to a single value for better performance.'),
] + _auth.OPTS
CONF = cfg.CONF
CONF.register_opts(_OPTS, group=_base.AUTHTOKEN_GROUP)
_LOG = logging.getLogger(__name__)
class _BIND_MODE(object):
DISABLED = 'disabled'
PERMISSIVE = 'permissive'
STRICT = 'strict'
REQUIRED = 'required'
KERBEROS = 'kerberos'
def _token_is_v2(token_info):
from keystonemiddleware.auth_token import _exceptions as ksm_exceptions
from keystonemiddleware.auth_token import _identity
from keystonemiddleware.auth_token import _opts
from keystonemiddleware.auth_token import _request
from keystonemiddleware.auth_token import _signing_dir
from keystonemiddleware.auth_token import _user_plugin
from keystonemiddleware.i18n import _
_LOG = logging.getLogger(__name__)
_CACHE_INVALID_INDICATOR = 'invalid'
oslo_cache.configure(cfg.CONF)
AUTH_TOKEN_OPTS = [
(_base.AUTHTOKEN_GROUP,
_opts._OPTS + _auth.OPTS + loading.get_auth_common_conf_options())
]
def list_opts():
"""Return a list of oslo_config options available in auth_token middleware.
The returned list includes all oslo_config options which may be registered
at runtime by the project.
Each element of the list is a tuple. The first element is the name of the
group under which the list of elements in the second element will be
registered. A group name of None corresponds to the [DEFAULT] group in
config files.
NOTE: This function is no longer used for oslo_config sample generation.
Some services rely on this function for listing ALL (including deprecated)