Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_decode_id_token_expired(self):
"""
Attempt to decode an expired id_token, confirms that the token is
decoded, but errors out on the expired signature.
"""
with self.assertRaises(jwt.exceptions.ExpiredSignatureError):
self.response.decode_id_token()
# test with deprecated usage pattern too
with self.assertRaises(jwt.exceptions.ExpiredSignatureError):
self.response.decode_id_token(self.ac)
def authenticate(self):
auth_header = self.request.headers.get('Authorization').split(' ')
if len(auth_header) != 2:
return False
bearer, token = auth_header
conf = dci_config.CONFIG
try:
decoded_token = auth.decode_jwt(token,
conf['SSO_PUBLIC_KEY'],
conf['SSO_CLIENT_ID'])
except (jwt_exc.DecodeError, ValueError):
decoded_token = sso.decode_token_with_latest_public_key(token)
except jwt_exc.ExpiredSignatureError:
raise dci_exc.DCIException('JWT token expired, please refresh.',
status_code=401)
team_id = None
ro_group = conf['SSO_READ_ONLY_GROUP']
realm_access = decoded_token['realm_access']
if 'roles' in realm_access and ro_group in realm_access['roles']:
team_id = flask.g.team_redhat_id
user_info = self._get_user_info(decoded_token)
try:
self.identity = self._get_or_create_user(user_info, team_id)
except sa_exc.IntegrityError:
raise dci_exc.DCICreationConflict(models.USERS.name, 'username')
return True
app.logger.info('No header')
return None
# Retrieve the Bearer token
parse_result = parse('Bearer {}', header)
if not parse_result:
app.logger.info(f'Wrong format for header "{header}"')
return None
token = parse_result[0]
try:
decoded_token = decode_token(token.encode('utf8'), public_key)
except jwt.exceptions.DecodeError:
app.logger.warning(f'Error decoding header "{header}". '
'This may be key missmatch or wrong key')
return None
except jwt.exceptions.ExpiredSignatureError:
app.logger.error(f'Authentication header has expired')
return None
# Check expiry is in the token
if 'exp' not in decoded_token:
app.logger.warning('Token does not have expiry (exp)')
return None
# Check username is in the token
if 'username' not in decoded_token:
app.logger.warning('Token does not have username')
return None
app.logger.info('Header successfully validated')
return decoded_token['username']
def decode_token(self, request):
token = request.GET.get(CreateUserView.TOKEN_QUERY_NAME)
if not token:
return None
try:
payload = jwt.decode(
token.encode('utf-8'),
settings.SECRET_KEY,
algorithms=['HS256'],
verify=True
)
pass
except jwt.exceptions.ExpiredSignatureError:
payload = False
except:
payload = None
return payload
def _validate_exp(self, payload, now, leeway):
try:
exp = int(payload["exp"])
except ValueError:
raise DecodeError(
"Expiration Time claim (exp) must be an" " integer."
)
if exp < (now - leeway):
raise ExpiredSignatureError("Signature has expired")
if 'exp' not in payload_keys:
raise exceptions.MissingRequiredClaimError('exp')
if 'type' not in payload_keys:
raise exceptions.MissingRequiredClaimError('type')
if 'identity' not in payload_keys:
raise exceptions.MissingRequiredClaimError('identity')
if 'iat' not in payload_keys:
raise exceptions.MissingRequiredClaimError('iat')
# signature compare
if bsignature is not make_signature(bheader, bpayload, secret):
raise exceptions.InvalidSignatureError('Invalid token signature')
# expire check
if payload['exp'] > int(time.time()):
raise exceptions.ExpiredSignatureError('Token has been expired')
return header, payload
async def get(self):
unverified_base64_token = self.get_argument('token')
unverified_token = base64.urlsafe_b64decode(unverified_base64_token)
try:
token = jwt.decode(unverified_token, public_key, algorithms='RS256')
except jwt.exceptions.ExpiredSignatureError:
raise HTTPError(
403, "Sharing link has expired. Ask for a fresh link."
)
except jwt.exceptions.InvalidSignatureError:
raise HTTPError(
403, ("Sharing link has an invalid signature. Was it "
"copy/pasted in full?")
)
app_log.info("Honoring token %s", token)
source_username = token['user']
user_options = token['opts']
source_path = token['path']
dest_path = self.get_argument('dest_path',
os.path.basename(source_path))
"""
try:
return jwt.decode(
jwt=token,
key=settings.JWT_AUTH_SETTINGS["PUBLIC_KEY"],
algorithms=[settings.JWT_AUTH_SETTINGS["ALGORITHM"]],
issuer=settings.JWT_AUTH_SETTINGS["ISSUER"],
audience=settings.JWT_AUTH_SETTINGS["AUDIENCE"],
verify=verify,
)
except jwt.exceptions.InvalidSignatureError:
raise jwt.exceptions.InvalidTokenError(
"The token signature could not be verified."
)
except jwt.exceptions.ExpiredSignatureError:
raise jwt.exceptions.InvalidTokenError("Expired token.")
except jwt.exceptions.InvalidAudienceError:
raise jwt.exceptions.InvalidTokenError("Invalid audience.")
except jwt.exceptions.InvalidIssuerError:
raise jwt.exceptions.InvalidTokenError("Invalid issuer.")
except jwt.exceptions.InvalidTokenError:
raise jwt.exceptions.InvalidTokenError("Invalid token.")