Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _validate_offline(self, token, token_hashes):
try:
if cms.is_pkiz(token):
verified = self._verify_pkiz_token(token, token_hashes)
elif cms.is_asn1_token(token):
verified = self._verify_signed_token(token, token_hashes)
else:
# Can't do offline validation for this type of token.
return
except ksc_exceptions.CertificateConfigError:
self.log.warning(_LW('Fetch certificate config failed, '
'fallback to online validation.'))
except ksm_exceptions.RevocationListError:
self.log.warning(_LW('Fetch revocation list failed, '
'fallback to online validation.'))
else:
data = jsonutils.loads(verified)
return data
log=log,
enforce_token_bind=self._conf_get('enforce_token_bind'))
# delay_auth_decision means we still allow unauthenticated requests
# through and we let the downstream service make the final decision
self._delay_auth_decision = self._conf_get('delay_auth_decision')
self._include_service_catalog = self._conf_get(
'include_service_catalog')
self._hash_algorithms = self._conf_get('hash_algorithms')
self._identity_server = self._create_identity_server()
self._auth_uri = self._conf_get('auth_uri')
if not self._auth_uri:
self.log.warning(
_LW('Configuring auth_uri to point to the public identity '
'endpoint is required; clients may not be able to '
'authenticate against an admin endpoint'))
# FIXME(dolph): drop support for this fallback behavior as
# documented in bug 1207517.
self._auth_uri = self._identity_server.auth_uri
self._signing_directory = _signing_dir.SigningDirectory(
directory_name=self._conf_get('signing_dir'), log=self.log)
self._token_cache = self._token_cache_factory()
revocation_cache_timeout = datetime.timedelta(
seconds=self._conf_get('revocation_cache_time'))
self._revocations = _revocations.Revocations(revocation_cache_timeout,
def _do_fetch_token(self, token):
"""Helper method to fetch a token and convert it into an AccessInfo"""
data = self._fetch_token(token)
try:
return data, access.create(body=data, auth_token=token)
except Exception:
self.log.warning(_LW('Invalid token contents.'), exc_info=True)
raise ksm_exceptions.InvalidToken(_('Token authorization failed'))
def verify():
try:
signing_cert_path = self._signing_directory.calc_path(
self._SIGNING_CERT_FILE_NAME)
signing_ca_path = self._signing_directory.calc_path(
self._SIGNING_CA_FILE_NAME)
return cms.cms_verify(data, signing_cert_path,
signing_ca_path,
inform=inform).decode('utf-8')
except (ksc_exceptions.CMSError,
cms.subprocess.CalledProcessError) as err:
self.log.warning(_LW('Verify error: %s'), err)
msg = _('Token authorization failed')
raise ksm_exceptions.InvalidToken(msg)
def _validate_offline(self, token, token_hashes):
try:
if cms.is_pkiz(token):
verified = self._verify_pkiz_token(token, token_hashes)
elif cms.is_asn1_token(token):
verified = self._verify_signed_token(token, token_hashes)
else:
# Can't do offline validation for this type of token.
return
except ksc_exceptions.CertificateConfigError:
self.log.warning(_LW('Fetch certificate config failed, '
'fallback to online validation.'))
except ksm_exceptions.RevocationListError:
self.log.warning(_LW('Fetch revocation list failed, '
'fallback to online validation.'))
else:
data = jsonutils.loads(verified)
return data
if not data:
data = self._identity_server.verify_token(token)
self._token_cache.store(token_hashes[0], data)
except (ksa_exceptions.ConnectFailure,
ksa_exceptions.RequestTimeout,
ksm_exceptions.RevocationListError,
ksm_exceptions.ServiceError) as e:
self.log.critical(_LC('Unable to validate token: %s'), e)
raise webob.exc.HTTPServiceUnavailable()
except ksm_exceptions.InvalidToken:
self.log.debug('Token validation failure.', exc_info=True)
if token_hashes:
self._token_cache.store_invalid(token_hashes[0])
self.log.warning(_LW('Authorization failed for token'))
raise
except Exception:
self.log.critical(_LC('Unable to validate token'), exc_info=True)
raise webob.exc.HTTPInternalServerError()
return data