Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_user(cls, token):
try:
dtoken = JwtManager.decode_token(token)
if not JwtManager.is_blacklisted(dtoken['jti']):
user = AuthManager.get_user(dtoken['username'])
if user.last_update <= dtoken['iat']:
return user
cls.logger.debug("user info changed after token was issued, iat=%s last_update=%s",
dtoken['iat'], user.last_update)
else:
cls.logger.debug('Token is black-listed')
except jwt.exceptions.ExpiredSignatureError:
cls.logger.debug("Token has expired")
except jwt.exceptions.InvalidTokenError:
cls.logger.debug("Failed to decode token")
except UserDoesNotExist:
cls.logger.debug("Invalid token: user %s does not exist", dtoken['username'])
return None
pass
class ExpiredSignatureError(InvalidTokenError):
pass
class InvalidAudienceError(InvalidTokenError):
pass
class InvalidIssuerError(InvalidTokenError):
pass
class InvalidIssuedAtError(InvalidTokenError):
pass
class ImmatureSignatureError(InvalidTokenError):
pass
class InvalidKeyError(Exception):
pass
class InvalidAlgorithmError(InvalidTokenError):
pass
class MissingRequiredClaimError(InvalidTokenError):
return provider
# Ask the OAuthAuthenticationProvider to turn its token
# into a Patron.
return provider.authenticated_patron(_db, provider_token)
elif (self.saml_providers_by_name
and isinstance(header, basestring)
and 'bearer' in header.lower()):
# The patron wants to use an
# SAMLAuthenticationProvider. Figure out which one.
try:
provider_name, provider_token = self.decode_bearer_token_from_header(
header
)
except jwt.exceptions.InvalidTokenError, e:
return INVALID_SAML_BEARER_TOKEN
provider = self.saml_provider_lookup(provider_name)
if isinstance(provider, ProblemDetail):
# There was a problem turning the provider name into
# a registered SAMLAuthenticationProvider.
return provider
# Ask the SAMLAuthenticationProvider to turn its token
# into a Patron.
return provider.authenticated_patron(_db, provider_token)
# We were unable to determine what was going on with the
# Authenticate header.
return UNSUPPORTED_AUTHENTICATION_MECHANISM
@api_bp.errorhandler(InvalidTokenError)
def invalid_token_error_handler(ex):
"""Register error handler for InvalidTokenError"""
ex = BadRequestException('Something Wrong',
description=ex.message)
return api_exception_handler(ex)
pass
class ImmatureSignatureError(InvalidTokenError):
pass
class InvalidKeyError(Exception):
pass
class InvalidAlgorithmError(InvalidTokenError):
pass
class MissingRequiredClaimError(InvalidTokenError):
def __init__(self, claim):
self.claim = claim
def __str__(self):
return 'Token is missing the "%s" claim' % self.claim
# Compatibility aliases (deprecated)
ExpiredSignature = ExpiredSignatureError
InvalidAudience = InvalidAudienceError
InvalidIssuer = InvalidIssuerError
def verify_password_reset_token(token) -> Optional[str]:
try:
decoded_token = jwt.decode(token, config.SECRET_KEY, algorithms=["HS256"])
assert decoded_token["sub"] == password_reset_jwt_subject
return decoded_token["username"]
except InvalidTokenError:
return None
def do_jwt_login(self, login_submission):
token = login_submission.get("token", None)
if token is None:
raise LoginError(
401, "Token field for JWT is missing",
errcode=Codes.UNAUTHORIZED
)
import jwt
from jwt.exceptions import InvalidTokenError
try:
payload = jwt.decode(token, self.jwt_secret, algorithms=[self.jwt_algorithm])
except jwt.ExpiredSignatureError:
raise LoginError(401, "JWT expired", errcode=Codes.UNAUTHORIZED)
except InvalidTokenError:
raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
user = payload.get("sub", None)
if user is None:
raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
user_id = UserID(user, self.hs.hostname).to_string()
auth_handler = self.auth_handler
registered_user_id = yield auth_handler.check_user_exists(user_id)
if registered_user_id:
device_id = login_submission.get("device_id")
initial_display_name = login_submission.get("initial_device_display_name")
device_id, access_token = yield self.registration_handler.register_device(
registered_user_id, device_id, initial_display_name,
)
def _validate_iap_jwt(iap_jwt, expected_audience):
try:
key_id = jwt.get_unverified_header(iap_jwt).get('kid')
if not key_id:
return (None, None, '**ERROR: no key ID**')
key = get_iap_key(key_id)
decoded_jwt = jwt.decode(
iap_jwt, key,
algorithms=['ES256'],
issuer='https://cloud.google.com/iap',
audience=expected_audience)
return (decoded_jwt['sub'], decoded_jwt['email'], '')
except (jwt.exceptions.InvalidTokenError,
requests.exceptions.RequestException) as e:
return (None, None, '**ERROR: JWT validation error {}**'.format(e))
def ValidateIapJwt(iap_jwt, expected_audience):
"""Validates an IAP JWT."""
try:
key_id = jwt.get_unverified_header(iap_jwt).get("kid")
if not key_id:
raise IAPValidationFailedError("No key ID")
key = GetIapKey(key_id)
decoded_jwt = jwt.decode(
iap_jwt, key, algorithms=["ES256"], audience=expected_audience)
return (decoded_jwt["sub"], decoded_jwt["email"])
except (jwt.exceptions.InvalidTokenError,
requests.exceptions.RequestException) as e:
raise IAPValidationFailedError(e)
def process_exception(self, request, exception):
"""Handle all InvalidTokenErrors."""
if isinstance(exception, InvalidTokenError):
logger.exception("JWT request token error")
response = _403(request, exception)
if getattr(request, "token", None):
request.token.log(request, response, error=exception)
return response