Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_should_load_keys_from_jwk_data_dict(self):
algo = RSAAlgorithm(RSAAlgorithm.SHA256)
with open(key_path("jwk_rsa_pub.json"), "r") as keyfile:
pub_key = algo.from_jwk(keyfile.read())
key_data_str = algo.to_jwk(pub_key)
key_data = json.loads(key_data_str)
# TODO Should `to_jwk` set these?
key_data["alg"] = "RS256"
key_data["use"] = "sig"
key_data["kid"] = "keyid-abc123"
jwk_set = PyJWKSet.from_dict({"keys": [key_data]})
jwk = jwk_set.keys[0]
assert jwk.key_type == "RSA"
config = r.json()
except Exception as e:
raise ApiError('Could not get OpenID configuration from well known URL: {}'.format(str(e)), 503)
if 'issuer' not in config:
error = config.get('error') or config.get('message') or config
raise ApiError('OpenID Connect issuer response invalid: {}'.format(error))
if config['issuer'].format(tenantid=app.config['AZURE_TENANT']) != issuer_url:
raise ApiError('Issuer Claim does not match Issuer URL used to retrieve OpenID configuration', 503)
if app.config['OIDC_VERIFY_TOKEN']:
try:
jwks_uri = config['jwks_uri']
r = requests.get(jwks_uri, timeout=2)
keys = {k['kid']: RSAAlgorithm.from_jwk(json.dumps(k)) for k in r.json()['keys']}
except Exception as e:
raise ApiError('Could not get OpenID JWT Key Set from JWKS URL: {}'.format(str(e)), 503)
else:
keys = {}
return config, keys
def _jwt_rs1_signing_algorithm():
global _jwtrs1
if _jwtrs1 is None:
import jwt.algorithms as jwtalgo
_jwtrs1 = jwtalgo.RSAAlgorithm(jwtalgo.hashes.SHA1)
return _jwtrs1
def _jwt_rs1_signing_algorithm():
global _jwtrs1
if _jwtrs1 is None:
import jwt.algorithms as jwtalgo
_jwtrs1 = jwtalgo.RSAAlgorithm(jwtalgo.hashes.SHA1)
return _jwtrs1
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
break
if rsa_key:
try:
id_token = jwt.decode(
access_token,
# TODO: this is stupid: we convert rsa_key to JWT JSON only to produce the public key JSON string
RSAAlgorithm.from_jwk(json.dumps(rsa_key)),
algorithms=auth_config.algorithms,
audience=auth_config.audience,
issuer=auth_config.issuer
)
except jwt.ExpiredSignatureError:
raise ServiceAuthError("Token expired",
log_message="Token is expired")
except jwt.InvalidTokenError:
raise ServiceAuthError("Invalid claims",
log_message="Incorrect claims, please check the audience and issuer")
except Exception:
raise ServiceAuthError("Invalid header",
log_message="Unable to parse authentication token.")
return id_token
raise ServiceAuthError("Invalid header",
def _jwt_rs1_signing_algorithm():
global _jwtrs1
if _jwtrs1 is None:
import jwt.algorithms as jwtalgo
_jwtrs1 = jwtalgo.RSAAlgorithm(jwtalgo.hashes.SHA1)
return _jwtrs1
def get_latest_public_key(sso_url, realm):
url = "%s/auth/realms/%s/.well-known/openid-configuration" % (sso_url, realm)
jwks_uri = requests.get(url).json()["jwks_uri"]
jwks = requests.get(jwks_uri).json()["keys"]
return RSAAlgorithm.from_jwk(json.dumps(jwks[0])).public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)