Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_no_public_key(self):
class ClientRegistrationEndpoint2(ClientRegistrationEndpoint):
def resolve_public_key(self, request):
return None
payload = {'software_id': 'uuid-123', 'client_name': 'Authlib'}
s = jwt.encode({'alg': 'RS256'}, payload, read_file_path('rsa_private.pem'))
body = {
'software_statement': s.decode('utf-8'),
}
self.prepare_data(ClientRegistrationEndpoint2)
headers = {'Authorization': 'bearer abc'}
rv = self.client.post('/create_client', json=body, headers=headers)
resp = json.loads(rv.data)
self.assertIn(resp['error'], 'unapproved_software_statement')
def test_software_statement(self):
payload = {'software_id': 'uuid-123', 'client_name': 'Authlib'}
s = jwt.encode({'alg': 'RS256'}, payload, read_file_path('rsa_private.pem'))
body = {
'software_statement': s.decode('utf-8'),
}
self.prepare_data()
headers = {'Authorization': 'bearer abc'}
rv = self.client.post('/create_client', json=body, headers=headers)
resp = json.loads(rv.data)
self.assertIn('client_id', resp)
self.assertEqual(resp['client_name'], 'Authlib')
def test_validate_essential_claims(self):
id_token = jwt.encode({'alg': 'HS256'}, {'iss': 'foo'}, 'k')
claims_options = {
'iss': {
'essential': True,
'values': ['foo']
}
}
claims = jwt.decode(id_token, 'k', claims_options=claims_options)
claims.validate()
claims.options = {'sub': {'essential': True}}
self.assertRaises(
errors.MissingClaimError,
claims.validate
)
def test_encode_datetime(self):
now = datetime.datetime.utcnow()
id_token = jwt.encode({'alg': 'HS256'}, {'exp': now}, 'k')
claims = jwt.decode(id_token, 'k')
self.assertIsInstance(claims.exp, int)
def test_validate_exp(self):
id_token = jwt.encode({'alg': 'HS256'}, {'exp': 'invalid'}, 'k')
claims = jwt.decode(id_token, 'k')
self.assertRaises(
errors.InvalidClaimError,
claims.validate
)
id_token = jwt.encode({'alg': 'HS256'}, {'exp': 1234}, 'k')
claims = jwt.decode(id_token, 'k')
self.assertRaises(
errors.ExpiredTokenError,
claims.validate
)
def test_invalid_values(self):
id_token = jwt.encode({'alg': 'HS256'}, {'iss': 'foo'}, 'k')
claims_options = {'iss': {'values': ['bar']}}
claims = jwt.decode(id_token, 'k', claims_options=claims_options)
self.assertRaises(
errors.InvalidClaimError,
claims.validate,
)
claims.options = {'iss': {'value': 'bar'}}
self.assertRaises(
errors.InvalidClaimError,
claims.validate,
)
def test_use_jws(self):
payload = {'name': 'hi'}
private_key = read_file_path('rsa_private.pem')
pub_key = read_file_path('rsa_public.pem')
data = jwt.encode({'alg': 'RS256'}, payload, private_key)
self.assertEqual(data.count(b'.'), 2)
claims = jwt.decode(data, pub_key)
self.assertEqual(claims['name'], 'hi')
def test_use_jwe(self):
payload = {'name': 'hi'}
private_key = read_file_path('rsa_private.pem')
pub_key = read_file_path('rsa_public.pem')
data = jwt.encode(
{'alg': 'RSA-OAEP', 'enc': 'A256GCM'},
payload, pub_key
)
self.assertEqual(data.count(b'.'), 4)
claims = jwt.decode(data, private_key)
self.assertEqual(claims['name'], 'hi')
raise OauthNotConfiguredError('SP not configured')
else:
header = {'alg': self.signing_alg}
ts = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
expiration_ts = ts + datetime.timedelta(seconds=self.expiration_seconds)
payload = {
'iss': self.issuer,
'sub': subject,
'exp': expiration_ts,
'iat': ts,
'jti': uuid.uuid4().hex
}
return jwt.encode(header=header, payload=payload, key=self.signing_key), expiration_ts
payload['sub'] = subject
if not issued_at:
issued_at = int(time.time())
expires_in = kwargs.pop('expires_in', 3600)
if not expires_at:
expires_at = issued_at + expires_in
payload['iat'] = issued_at
payload['exp'] = expires_at
if claims:
payload.update(claims)
return jwt.encode(header, payload, key)