Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _validate_token(self, auth_ref):
"""Perform the validation steps on the token.
:param auth_ref: The token data
:type auth_ref: keystoneauth1.access.AccessInfo
:raises exc.InvalidToken: if token is rejected
"""
# 0 seconds of validity means it is invalid right now
if auth_ref.will_expire_soon(stale_duration=0):
raise ksm_exceptions.InvalidToken(_('Token authorization failed'))
def _verify_pkiz_token(self, signed_text, token_ids):
self._revocations.check(token_ids)
try:
uncompressed = cms.pkiz_uncompress(signed_text)
verified = self._cms_verify(uncompressed, inform=cms.PKIZ_CMS_FORM)
return verified
# TypeError If the signed_text is not zlib compressed
# binascii.Error if signed_text has incorrect base64 padding (py34)
except (TypeError, binascii.Error):
raise ksm_exceptions.InvalidToken(signed_text)
self._validate_token(user_auth_ref)
self._confirm_token_bind(user_auth_ref, request)
except ksm_exceptions.InvalidToken:
self.log.info(_LI('Invalid user token'))
request.user_token_valid = False
else:
request.user_token_valid = True
request.token_info = data
if request.service_token:
self.log.debug('Authenticating service token')
try:
_, serv_auth_ref = self._do_fetch_token(request.service_token)
self._validate_token(serv_auth_ref)
self._confirm_token_bind(serv_auth_ref, request)
except ksm_exceptions.InvalidToken:
self.log.info(_LI('Invalid service token'))
request.service_token_valid = False
else:
request.service_token_valid = True
request.token_auth = _user_plugin.UserAuthPlugin(user_auth_ref,
serv_auth_ref)
def _uncompress_pkiz(token):
# TypeError If the signed_text is not zlib compressed binascii.Error if
# signed_text has incorrect base64 padding (py34)
try:
return cms.pkiz_uncompress(token)
except (TypeError, binascii.Error):
raise ksm_exceptions.InvalidToken(token)
def _do_fetch_token(self, token, **kwargs):
"""Helper method to fetch a token and convert it into an AccessInfo."""
# NOTE(edmondsw): strip the token to remove any whitespace that may
# have been passed along in the header per bug 1689468
token = token.strip()
data = self.fetch_token(token, **kwargs)
try:
return data, access.create(body=data, auth_token=token)
except Exception:
self.log.warning('Invalid token contents.', exc_info=True)
raise ksm_exceptions.InvalidToken(_('Token authorization failed'))
def _do_fetch_token(self, token):
"""Helper method to fetch a token and convert it into an AccessInfo"""
data = self._fetch_token(token)
try:
return data, access.create(body=data, auth_token=token)
except Exception:
self.log.warning(_LW('Invalid token contents.'), exc_info=True)
raise ksm_exceptions.InvalidToken(_('Token authorization failed'))
def fetch_token(self, token, **kwargs):
"""Low level replacement of fetch_token for AuthProtocol."""
token_data = self._token_data.get(token, {})
if token_data:
self._assert_token_not_expired(token_data.expires)
return token_data
raise _exceptions.InvalidToken()