Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_reload():
"""Emulate what happens if you fetch keys from a remote site and you get back the same JWKS as the last time."""
_jwks = JWK0
kb = KeyBundle()
kb.imp_jwks = _jwks
kb.do_keys(kb.imp_jwks["keys"])
assert len(kb) == 1
kb.do_keys(kb.imp_jwks["keys"])
assert len(kb) == 1
raise RequirementsNotMet("No dynamic key handling")
r = urlparse(_uri)
# find the old key for this key usage and mark that as inactive
for kb in conv.client.keyjar.issuer_keys[""]:
for key in kb.keys():
if key.use in self.new_key["use"]:
key.inactive = True
kid = 0
# only one key
_nk = self.new_key
_typ = _nk["type"].upper()
if _typ == "RSA":
kb = KeyBundle(source="file://%s" % _nk["key"],
fileformat="der", keytype=_typ,
keyusage=_nk["use"])
else:
kb = {}
for k in kb.keys():
k.serialize()
k.kid = self.kid_template % kid
kid += 1
conv.client.kid[k.use][k.kty] = k.kid
conv.client.keyjar.add_kb("", kb)
dump_jwks(conv.client.keyjar[""], r.path[1:])
},
"request_method": "param"
}
SERVER_INFO = {
"version": "3.0",
"issuer": "https://connect-op.heroku.com",
"authorization_endpoint": "http://localhost:8088/authorization",
"token_endpoint": "http://localhost:8088/token",
"flows_supported": ["code", "token", "code token"],
}
CLIENT_SECRET = "abcdefghijklmnop"
CLIENT_ID = "client_1"
KC_SYM = KeyBundle([{"kty": "oct", "key": CLIENT_SECRET, "use": "ver"},
{"kty": "oct", "key": CLIENT_SECRET, "use": "sig"}])
KC_SYM2 = KeyBundle([{"kty": "oct", "key": "drickyoughurt", "use": "sig"},
{"kty": "oct", "key": "drickyoughurt", "use": "ver"}])
KC_RSA = keybundle_from_local_file("%s/rsa.key" % BASE_PATH,
"rsa", ["ver", "sig"])
KEYJAR = KeyJar()
KEYJAR[CLIENT_ID] = [KC_SYM, KC_RSA]
KEYJAR["number5"] = [KC_SYM2, KC_RSA]
KEYJAR[""] = KC_RSA
CDB = {
"number5": {
"password": "hemligt",
"client_secret": "drickyoughurt",
def export(self):
# has to be there
self.trace.info("EXPORT")
if self.client.keyjar is None:
self.client.keyjar = KeyJar()
kbl = []
kid_template = "a%d"
kid = 0
for typ, info in self.cconf["keys"].items():
kb = KeyBundle(source="file://%s" % info["key"], fileformat="der",
keytype=typ)
for k in kb.keys():
k.serialize()
k.kid = kid_template % kid
kid += 1
self.client.kid[k.use][k.kty] = k.kid
self.client.keyjar.add_kb("", kb)
kbl.append(kb)
try:
new_name = "static/jwks.json"
dump_jwks(kbl, new_name)
self.client.jwks_uri = "%s%s" % (self.cconf["_base_url"], new_name)
except KeyError:
from oic.oic.message import AuthorizationResponse
from oic.oic.message import IdToken
from oic.oic.message import OpenIDSchema
from oic.oic.message import ProviderConfigurationResponse
from oic.oic.message import RegistrationResponse
from oic.utils.authn.client import CLIENT_AUTHN_METHOD
from oic.utils.keyio import KeyBundle
from oic.utils.keyio import KeyJar
from oic.utils.keyio import keybundle_from_local_file
from oic.utils.sdb import DictSessionBackend
from oic.utils.sdb import session_get
from oic.utils.time_util import utc_time_sans_frac
__author__ = "rohe0002"
KC_SYM_VS = KeyBundle({"kty": "oct", "key": "abcdefghijklmnop", "use": "ver"})
KC_SYM_S = KeyBundle({"kty": "oct", "key": "abcdefghijklmnop", "use": "sig"})
BASE_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "data/keys"))
KC_RSA = keybundle_from_local_file(
os.path.join(BASE_PATH, "rsa.key"), "rsa", ["ver", "sig"]
)
SRVKEYS = KeyJar()
SRVKEYS[""] = [KC_RSA]
SRVKEYS["client_1"] = [KC_SYM_VS, KC_RSA]
CLIKEYS = KeyJar()
CLIKEYS["http://localhost:8088"] = [KC_RSA]
CLIKEYS[""] = [KC_RSA, KC_SYM_VS]
CLIKEYS["https://example.com"] = [KC_RSA]
def construct_jwks(_client, key_conf):
"""
Construct the jwks
"""
if _client.keyjar is None:
_client.keyjar = KeyJar()
kbl = []
kid_template = "a%d"
kid = 0
for typ, info in key_conf.items():
kb = KeyBundle(source="file://%s" % info["key"], fileformat="der",
keytype=typ)
for k in kb.keys():
k.serialize()
k.kid = kid_template % kid
kid += 1
_client.kid[k.use][k.kty] = k.kid
_client.keyjar.add_kb("", kb)
kbl.append(kb)
jwks = {"keys": []}
for kb in kbl:
# ignore simple keys
jwks["keys"].extend([k.to_dict()
for k in kb.keys() if k.kty != 'oct'])
def test_verify_token_encrypted_no_key():
idt = IdToken(
sub="553df2bcf909104751cfd8b2",
aud=["5542958437706128204e0000", "554295ce3770612820620000"],
auth_time=1441364872,
azp="554295ce3770612820620000",
)
kj = KeyJar()
kb = KeyBundle()
kb.do_local_der(
os.path.join(os.path.dirname(__file__), "data", "keys", "cert.key"),
"some",
["enc", "sig"],
)
kj.add_kb("", kb)
kj.add_kb("https://sso.qa.7pass.ctf.prosiebensat1.com", kb)
packer = JWT(
kj,
lifetime=3600,
iss="https://sso.qa.7pass.ctf.prosiebensat1.com",
encrypt=True,
)
_jws = packer.pack(**idt.to_dict())
msg = AuthorizationResponse(id_token=_jws)
from oic.federation import ClientMetadataStatement
from oic.federation.operator import Operator
from oic.utils.authn.authn_context import AuthnBroker
from oic.utils.authn.user import UserAuthnMethod
from oic.utils.authz import AuthzHandling
from oic.utils.keyio import KeyBundle
from oic.utils.keyio import KeyJar
from oic.utils.keyio import build_keyjar
from oic.utils.userinfo import UserInfo
BASE_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "data/keys"))
_key = rsa_load(os.path.join(BASE_PATH, "rsa.key"))
KC_RSA = KeyBundle({"key": _key, "kty": "RSA", "use": "sig"})
CLIENT_ID = "client_1"
KEYDEFS = [
{"type": "RSA", "key": '', "use": ["sig"]},
{"type": "EC", "crv": "P-256", "use": ["sig"]}
]
CONSUMER_CONFIG = {
"authz_page": "/authz",
"scope": ["openid"],
"response_type": ["code"],
"user_info": {
"name": None,
"email": None,
"nickname": None
pass
except Exception as err:
msg = "Failed to load client keys: {}"
logger.error(msg.format(sanitize(request.to_dict())))
logger.error("%s", err)
error = ClientRegistrationError(
error="invalid_configuration_parameter", error_description="%s" % err
)
return Response(
error.to_json(),
content="application/json",
status_code="400 Bad Request",
)
# Add the client_secret as a symmetric key to the keyjar
_kc = KeyBundle(
[
{"kty": "oct", "key": client_secret, "use": "ver"},
{"kty": "oct", "key": client_secret, "use": "sig"},
]
)
try:
self.keyjar[client_id].append(_kc)
except KeyError:
self.keyjar[client_id] = [_kc]
OAS.baseurl = config.baseurl
else:
if config.baseurl.endswith("/"):
config.baseurl = config.baseurl[:-1]
OAS.baseurl = "%s:%d" % (config.baseurl, args.port)
if not OAS.baseurl.endswith("/"):
OAS.baseurl += "/"
try:
OAS.keyjar[""] = []
kbl = []
for typ, info in config.keys.items():
typ = typ.upper()
LOGGER.info("OC server key init: %s, %s" % (typ, info))
kb = KeyBundle(source="file://%s" % info["key"], fileformat="der",
keytype=typ)
OAS.keyjar.add_kb("", kb)
kbl.append(kb)
try:
new_name = "static/jwks.json"
dump_jwks(kbl, new_name)
OAS.jwks_uri.append("%s%s" % (OAS.baseurl, new_name))
except KeyError:
pass
for b in OAS.keyjar[""]:
LOGGER.info("OC3 server keys: %s" % b)
except Exception, err:
LOGGER.error("Key setup failed: %s" % err)
OAS.key_setup("static", sig={"format": "jwk", "alg": "rsa"})