Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_validate_exp(self):
id_token = jwt.encode({'alg': 'HS256'}, {'exp': 'invalid'}, 'k')
claims = jwt.decode(id_token, 'k')
self.assertRaises(
errors.InvalidClaimError,
claims.validate
)
id_token = jwt.encode({'alg': 'HS256'}, {'exp': 1234}, 'k')
claims = jwt.decode(id_token, 'k')
self.assertRaises(
errors.ExpiredTokenError,
claims.validate
)
def test_encode_datetime(self):
now = datetime.datetime.utcnow()
id_token = jwt.encode({'alg': 'HS256'}, {'exp': now}, 'k')
claims = jwt.decode(id_token, 'k')
self.assertIsInstance(claims.exp, int)
def test_validate_nbf(self):
id_token = jwt.encode({'alg': 'HS256'}, {'nbf': 'invalid'}, 'k')
claims = jwt.decode(id_token, 'k')
self.assertRaises(
errors.InvalidClaimError,
claims.validate
)
id_token = jwt.encode({'alg': 'HS256'}, {'nbf': 1234}, 'k')
claims = jwt.decode(id_token, 'k')
claims.validate()
id_token = jwt.encode({'alg': 'HS256'}, {'nbf': 1234}, 'k')
claims = jwt.decode(id_token, 'k')
self.assertRaises(
errors.InvalidTokenError,
claims.validate, 123
)
def test_use_jwe(self):
payload = {'name': 'hi'}
private_key = read_file_path('rsa_private.pem')
pub_key = read_file_path('rsa_public.pem')
data = jwt.encode(
{'alg': 'RSA-OAEP', 'enc': 'A256GCM'},
payload, pub_key
)
self.assertEqual(data.count(b'.'), 4)
claims = jwt.decode(data, private_key)
self.assertEqual(claims['name'], 'hi')
def process_assertion_claims(self, assertion):
"""Extract JWT payload claims from request "assertion", per
`Section 3.1`_.
:param assertion: assertion string value in the request
:return: JWTClaims
:raise: InvalidGrantError
.. _`Section 3.1`: https://tools.ietf.org/html/rfc7523#section-3.1
"""
claims = jwt.decode(
assertion, self.resolve_public_key,
claims_options=self.create_claims_options())
try:
claims.validate()
except JoseError as e:
log.debug('Assertion Error: %r', e)
raise InvalidGrantError(description=e.description)
return claims
def parse_id_token(token_data, nonce):
def load_key(header, payload):
# TODO: cache this?
jwk_set = requests.get(current_app.config['OIDC_JWKS_URL']).json()
return jwk.loads(jwk_set, header.get('kid'))
id_token = token_data['id_token']
claims_params = {'nonce': nonce, 'client_id': current_app.config['OIDC_CLIENT_ID']}
if 'access_token' in token_data:
claims_params['access_token'] = token_data['access_token']
claims_cls = CodeIDToken
else:
claims_cls = ImplicitIDToken
claims_options = {'iss': {'values': [current_app.config['OIDC_ISSUER']]}}
claims = jwt.decode(
id_token,
key=load_key,
claims_cls=claims_cls,
claims_options=claims_options,
claims_params=claims_params,
)
claims.validate(leeway=120)
return UserInfo(claims)
def _parse_id_token(self, token_data, nonce):
id_token = token_data['id_token']
claims_params = {'nonce': nonce, 'client_id': self.oidc_settings['client_id']}
if 'access_token' in token_data:
claims_params['access_token'] = token_data['access_token']
claims_cls = CodeIDToken
else:
claims_cls = ImplicitIDToken
# XXX: should we allow extra claims to be specified in the settings?
claims_options = {'iss': {'values': [self.oidc_settings['issuer']]}}
claims = jwt.decode(
id_token,
key=self._load_jwk,
claims_cls=claims_cls,
claims_options=claims_options,
claims_params=claims_params,
)
claims.validate(leeway=120)
info = UserInfo(claims)
for key in INTERNAL_FIELDS:
info.pop(key, None)
return info
def process_assertion_claims(self, assertion, resolve_key):
"""Extract JWT payload claims from request "assertion", per
`Section 3.1`_.
:param assertion: assertion string value in the request
:param resolve_key: function to resolve the sign key
:return: JWTClaims
:raise: InvalidClientError
.. _`Section 3.1`: https://tools.ietf.org/html/rfc7523#section-3.1
"""
try:
claims = jwt.decode(
assertion, resolve_key,
claims_options=self.create_claims_options()
)
claims.validate()
except JoseError as e:
log.debug('Assertion Error: %r', e)
raise InvalidClientError()
return claims