Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_auth_plugin(self):
# NOTE(jamielennox): Ideally this would use load_from_conf_options
# however that is not possible because we have to support the override
# pattern we use in _conf.get. This function therefore does a manual
# version of load_from_conf_options with the fallback plugin inline.
group = self._conf.get('auth_section') or _base.AUTHTOKEN_GROUP
# NOTE(jamielennox): auth_plugin was deprecated to auth_type. _conf.get
# doesn't handle that deprecation in the case of conf dict options so
# we have to manually check the value
plugin_name = (self._conf.get('auth_type', group=group)
or self._conf.paste_overrides.get('auth_plugin'))
if not plugin_name:
return _auth.AuthTokenPlugin(
log=self.log,
auth_admin_prefix=self._conf.get('auth_admin_prefix',
group=group),
auth_host=self._conf.get('auth_host', group=group),
auth_port=self._conf.get('auth_port', group=group),
auth_protocol=self._conf.get('auth_protocol', group=group),
identity_uri=self._conf.get('identity_uri', group=group),
' tokens. This requires that PKI tokens are configured on the'
' identity server.'),
cfg.ListOpt('hash_algorithms', default=['md5'],
help='Hash algorithms to use for hashing PKI tokens. This may'
' be a single algorithm or multiple. The algorithms are those'
' supported by Python standard hashlib.new(). The hashes will'
' be tried in the order given, so put the preferred one first'
' for performance. The result of the first hash will be stored'
' in the cache. This will typically be set to multiple values'
' only while migrating from a less secure algorithm to a more'
' secure one. Once all the old tokens are expired this option'
' should be set to a single value for better performance.'),
] + _auth.OPTS
CONF = cfg.CONF
CONF.register_opts(_OPTS, group=_base.AUTHTOKEN_GROUP)
_LOG = logging.getLogger(__name__)
class _BIND_MODE(object):
DISABLED = 'disabled'
PERMISSIVE = 'permissive'
STRICT = 'strict'
REQUIRED = 'required'
KERBEROS = 'kerberos'
def _token_is_v2(token_info):
return ('access' in token_info)
from keystonemiddleware.auth_token import _cache
from keystonemiddleware.auth_token import _exceptions as ksm_exceptions
from keystonemiddleware.auth_token import _identity
from keystonemiddleware.auth_token import _opts
from keystonemiddleware.auth_token import _request
from keystonemiddleware.auth_token import _signing_dir
from keystonemiddleware.auth_token import _user_plugin
from keystonemiddleware.i18n import _
_LOG = logging.getLogger(__name__)
_CACHE_INVALID_INDICATOR = 'invalid'
oslo_cache.configure(cfg.CONF)
AUTH_TOKEN_OPTS = [
(_base.AUTHTOKEN_GROUP,
_opts._OPTS + _auth.OPTS + loading.get_auth_common_conf_options())
]
def list_opts():
"""Return a list of oslo_config options available in auth_token middleware.
The returned list includes all oslo_config options which may be registered
at runtime by the project.
Each element of the list is a tuple. The first element is the name of the
group under which the list of elements in the second element will be
registered. A group name of None corresponds to the [DEFAULT] group in
config files.
NOTE: This function is no longer used for oslo_config sample generation.
'Keystone installation, or otherwise bypassing '
'the normal authentication process. This option '
'should not be used, use `admin_user` and '
'`admin_password` instead.'),
cfg.StrOpt('admin_user',
help='Service username.'),
cfg.StrOpt('admin_password',
secret=True,
help='Service user password.'),
cfg.StrOpt('admin_tenant_name',
default='admin',
help='Service tenant name.'),
]
cfg.CONF.register_opts(OPTS, group=_base.AUTHTOKEN_GROUP)
' list must be present. For backwards compatibility reasons'
' this currently only affects the allow_expired check.'),
cfg.BoolOpt('service_token_roles_required', default=False,
help='For backwards compatibility reasons we must let valid'
' service tokens pass that don\'t pass the service_token_roles'
' check as valid. Setting this true will become the default'
' in a future release and should be enabled if possible.'),
cfg.StrOpt('service_type',
help='The name or type of the service as it appears in the'
' service catalog. This is used to validate tokens that have'
' restricted access rules.'),
]
CONF = cfg.CONF
CONF.register_opts(_OPTS, group=_base.AUTHTOKEN_GROUP)
loading.register_auth_conf_options(cfg.CONF, _base.AUTHTOKEN_GROUP)
auth_token_opts = [
(_base.AUTHTOKEN_GROUP, _OPTS + loading.get_auth_common_conf_options()),
]
__all__ = (
'list_opts',
)
def list_opts():
"""Return a list of oslo_config options available in auth_token middleware.
The returned list includes the non-deprecated oslo_config options which may
else:
default_config_files = None
# For unit tests, support passing in a ConfigOpts in
# oslo_config_config.
self._local_oslo_config = conf.get('oslo_config_config',
cfg.ConfigOpts())
self._local_oslo_config(
{}, project=conf['oslo_config_project'],
default_config_files=default_config_files,
validate_default_values=True)
self._local_oslo_config.register_opts(
_OPTS, group=_base.AUTHTOKEN_GROUP)
loading.register_auth_conf_options(self._local_oslo_config,
group=_base.AUTHTOKEN_GROUP)
super(AuthProtocol, self).__init__(
app,
log=log,
enforce_token_bind=self._conf_get('enforce_token_bind'))
# delay_auth_decision means we still allow unauthenticated requests
# through and we let the downstream service make the final decision
self._delay_auth_decision = self._conf_get('delay_auth_decision')
self._include_service_catalog = self._conf_get(
'include_service_catalog')
self._hash_algorithms = self._conf_get('hash_algorithms')
self._identity_server = self._create_identity_server()
self._auth_uri = self._conf_get('auth_uri')
' this currently only affects the allow_expired check.'),
cfg.BoolOpt('service_token_roles_required', default=False,
help='For backwards compatibility reasons we must let valid'
' service tokens pass that don\'t pass the service_token_roles'
' check as valid. Setting this true will become the default'
' in a future release and should be enabled if possible.'),
cfg.StrOpt('service_type',
help='The name or type of the service as it appears in the'
' service catalog. This is used to validate tokens that have'
' restricted access rules.'),
]
CONF = cfg.CONF
CONF.register_opts(_OPTS, group=_base.AUTHTOKEN_GROUP)
loading.register_auth_conf_options(cfg.CONF, _base.AUTHTOKEN_GROUP)
auth_token_opts = [
(_base.AUTHTOKEN_GROUP, _OPTS + loading.get_auth_common_conf_options()),
]
__all__ = (
'list_opts',
)
def list_opts():
"""Return a list of oslo_config options available in auth_token middleware.
The returned list includes the non-deprecated oslo_config options which may
be registered at runtime by the project. The purpose of this is to allow