Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _assert_token_not_expired(self, token_expires):
if timeutils.utcnow() > timeutils.normalize_time(token_expires):
raise _exceptions.InvalidToken()
:param request: Incoming request
:type request: _request.AuthTokenRequest
"""
user_auth_ref = None
serv_auth_ref = None
allow_expired = False
if request.service_token:
self.log.debug('Authenticating service token')
try:
_, serv_auth_ref = self._do_fetch_token(request.service_token)
self._validate_token(serv_auth_ref)
self._confirm_token_bind(serv_auth_ref, request)
except ksm_exceptions.InvalidToken:
self.log.info('Invalid service token')
request.service_token_valid = False
else:
# FIXME(jamielennox): The new behaviour for service tokens is
# that they have to pass the policy check to be allowed.
# Previously any token was accepted here. For now we will
# continue to mark service tokens as valid if they are valid
# but we will only allow service role tokens to do
# allow_expired. In future we should reject any token that
# isn't a service token here.
role_names = set(serv_auth_ref.role_names)
check = self._service_token_roles.intersection(role_names)
role_check_passed = bool(check)
# if service_token_roles_required then the service token is
# only valid if the roles check out. Otherwise at this point it
def check(self, token_ids):
if self._any_revoked(token_ids):
self._log.debug('Token is marked as having been revoked')
raise exc.InvalidToken(_('Token has been revoked'))
def auth_filter(app):
return AuthProtocol(app, conf)
return auth_filter
def app_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
return AuthProtocol(None, conf)
# NOTE(jamielennox): Maintained here for public API compatibility.
InvalidToken = ksm_exceptions.InvalidToken
ServiceError = ksm_exceptions.ServiceError
ConfigurationError = ksm_exceptions.ConfigurationError
def verify():
try:
signing_cert_path = self._signing_directory.calc_path(
self._SIGNING_CERT_FILE_NAME)
signing_ca_path = self._signing_directory.calc_path(
self._SIGNING_CA_FILE_NAME)
return cms.cms_verify(data, signing_cert_path,
signing_ca_path,
inform=inform).decode('utf-8')
except (ksc_exceptions.CMSError,
cms.subprocess.CalledProcessError) as err:
self.log.warning(_LW('Verify error: %s'), err)
msg = _('Token authorization failed')
raise ksm_exceptions.InvalidToken(msg)