Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_verify_id_token_iss_not_in_keyjar():
idt = IdToken(
**{
"sub": "553df2bcf909104751cfd8b2",
"aud": ["5542958437706128204e0000", "554295ce3770612820620000"],
"auth_time": 1441364872,
"azp": "554295ce3770612820620000",
}
)
kj = KeyJar()
kj.add_symmetric("", "dYMmrcQksKaPkhdgRNYk3zzh5l7ewdDJ", ["sig"])
kj.add_symmetric(
"https://sso.qa.7pass.ctf.prosiebensat1.com",
"dYMmrcQksKaPkhdgRNYk3zzh5l7ewdDJ",
["sig"],
)
packer = JWT(kj, sign_alg="HS256", lifetime=3600, iss="https://example.com/op")
areq = AuthorizationRequest(
response_type="code",
client_id="client_1",
redirect_uri="http://example.com/authz",
scope=["openid"],
state="state000",
)
sdb = self.provider.sdb
ae = AuthnEvent("userX", "salt")
sid = sdb.create_authz_session(ae, areq)
sdb.do_sub(sid, "client_salt")
_info = sdb[sid]
# All this is jut removed when the id_token is constructed
# The proper information comes from the session information
_user_info = IdToken(
iss="https://foo.example.om",
sub="foo",
aud=bib["client_id"],
exp=epoch_in_a_while(minutes=10),
acr="2",
nonce=bib["nonce"],
)
idt = self.provider.id_token_as_signed_jwt(
_info, access_token="access_token", user_info=_user_info
)
req["id_token"] = idt
query_string = req.to_urlencoded()
# client_id not in id_token["aud"] so login required
KC_SYM_S = KeyBundle(
{"kty": "oct", "key": "abcdefghijklmnop".encode("utf-8"), "use": "sig",
"alg": "HS256"})
BASE_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir, os.pardir,
"data/keys"))
_key = rsa_load(os.path.join(BASE_PATH, "rsa.key"))
KC_RSA = KeyBundle({"key": _key, "kty": "RSA", "use": "sig"})
KEYJ = KeyJar()
KEYJ[""] = [KC_RSA, KC_SYM_S]
KEYJ["client_1"] = [KC_RSA, KC_SYM_S]
CLIENT_ID = "client_1"
IDTOKEN = IdToken(iss="http://oic.example.org/", sub="sub",
aud=CLIENT_ID, exp=utc_time_sans_frac() + 86400,
nonce="N0nce",
iat=time.time())
# ----------------- CLIENT --------------------
class TestClient(object):
@pytest.fixture(autouse=True)
def create_client(self):
self.redirect_uri = "http://example.com/redirect"
self.client = Client(CLIENT_ID, client_authn_method=CLIENT_AUTHN_METHOD)
self.client.redirect_uris = [self.redirect_uri]
self.client.authorization_endpoint = "http://example.com/authorization"
self.client.token_endpoint = "http://example.com/token"
self.client.userinfo_endpoint = "http://example.com/userinfo"
def test_verify_id_token_reject_wrong_aud(self, monkeypatch):
issuer = "https://provider.example.com"
monkeypatch.setattr(self.client, "provider_info", {"issuer": issuer})
id_token = IdToken(**dict(iss=issuer, aud=["nobody"]))
with pytest.raises(OtherError) as exc:
self.client._verify_id_token(id_token)
assert "me" in str(exc.value)
def test_verify_id_token_missing_iss():
idt = IdToken(
**{
"sub": "553df2bcf909104751cfd8b2",
"aud": ["5542958437706128204e0000", "554295ce3770612820620000"],
"auth_time": 1441364872,
"azp": "554295ce3770612820620000",
}
)
kj = KeyJar()
kj.add_symmetric("", "dYMmrcQksKaPkhdgRNYk3zzh5l7ewdDJ", ["sig"])
kj.add_symmetric(
"https://sso.qa.7pass.ctf.prosiebensat1.com",
"dYMmrcQksKaPkhdgRNYk3zzh5l7ewdDJ",
["sig"],
)
packer = JWT(kj, sign_alg="HS256", lifetime=3600)
def test_verify_token_encrypted_no_key():
idt = IdToken(
sub="553df2bcf909104751cfd8b2",
aud=["5542958437706128204e0000", "554295ce3770612820620000"],
auth_time=1441364872,
azp="554295ce3770612820620000",
)
kj = KeyJar()
kb = KeyBundle()
kb.do_local_der(
os.path.join(os.path.dirname(__file__), "data", "keys", "cert.key"),
"some",
["enc", "sig"],
)
kj.add_kb("", kb)
kj.add_kb("https://sso.qa.7pass.ctf.prosiebensat1.com", kb)
packer = JWT(
def test_check_session_endpoint():
server = srv_init
print server.name
server.keystore.add_key(CDB["number5"]["client_secret"], "hmac", "verify",
"number5")
session = {"user_id": "UserID", "client_id": "number5"}
idtoken = server._id_token(session)
csr = CheckSessionRequest(id_token=idtoken)
environ = BASE_ENVIRON.copy()
environ["QUERY_STRING"] = csr.get_urlencoded()
info = server.check_session_endpoint(environ, start_response, LOG())
print info
idt = IdToken.set_json(info[0])
print idt.keys()
assert _eq(idt.keys(), ['user_id', 'aud', 'iss', 'acr', 'exp'])
assert idt.iss == server.name
def test_id_token():
idt = IdToken(**{
"sub": "553df2bcf909104751cfd8b2",
"aud": [
"5542958437706128204e0000",
"554295ce3770612820620000"
],
"auth_time": 1441364872,
"azp": "554295ce3770612820620000",
"at_hash": "L4Ign7TCAD_EppRbHAuCyw",
"iat": 1441367116,
"exp": 1441374316,
"iss": "https://sso.qa.7pass.ctf.prosiebensat1.com"
})
idt.verify()
def _parse_id_token(self, id_token, redirect_uri):
try:
return IdToken().from_jwt(id_token, keyjar=self.keyjar)
except Exception as err:
logger.error("Faulty id_token: %s" % id_token)
logger.error("Exception: %s" % (err.__class__.__name__,))
id_token = IdToken().from_jwt(id_token, verify=False)
logger.error("IdToken: %s" % id_token.to_dict())
return redirect_authz_error("invalid_id_token_object", redirect_uri)