Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_provider(self):
kb = KeyBundle(JWKS["keys"])
kj = KeyJar()
kj.issuer_keys[""] = [kb]
_sdb = SessionDB(
"https://example.com/",
db=DictSessionBackend(),
code_factory=DefaultToken(
"supersecret", "verybadpassword", typ="A", lifetime=600
),
token_factory=JWTToken(
"T",
keyjar=kj,
lt_pattern={"code": 3600, "token": 900},
iss="https://example.com/as",
sign_alg="RS256",
),
refresh_token_factory=JWTToken(
def fo_member(*args):
_kj = KeyJar()
for fo in args:
_kj.import_jwks(fo.jwks, fo.iss)
return Operator(fo_keyjar=_kj)
def export_keys(keys):
kbl = []
keyjar = KeyJar()
for typ, info in keys.items():
kb = KeyBundle(source="file://%s" % info["key"], fileformat="der",
keytype=typ)
keyjar.add_kb("", kb)
kbl.append(kb)
try:
new_name = "static/jwks.json"
dump_jwks(kbl, new_name)
except KeyError:
pass
return keyjar
def test_get_inactive_sig(self):
"""get_signing_key cannot return inactive `sig` key."""
ks = KeyJar()
ks["http://example.com"] = KeyBundle(
[{"kty": "oct", "key": "a1b2c3d4", "use": "sig"}]
)
ks["http://example.com"][0]._keys[0].inactive_since = 1
key = ks.get_signing_key(owner="http://example.com")
assert len(key) == 0
def test_recuperate_jwks(self):
self.provider.keyjar = KeyJar() # Empty keyjar, all keys are lost
with open(os.path.join(BASE_PATH, "jwk_enc.json")) as keyf:
key = keyf.read()
info = {
"id_token_encrypted_response_alg": "A128KW",
"id_token_encrypted_response_enc": "A128CBC-HS256",
"client_secret": "some_secret",
"jwks": json.loads(key),
}
self.provider.recuperate_keys("some_client", info)
assert len(self.provider.keyjar.get_issuer_keys("some_client")) == 3
def test_pkce_token():
kb = KeyBundle(JWKS["keys"])
kj = KeyJar()
kj.issuer_keys[""] = [kb]
constructor = JWTToken(
"A",
keyjar=kj,
lt_pattern={"": 900},
iss="https://example.com/as",
sign_alg="RS256",
encrypt=True,
)
sid = rndstr(32)
session_info = {
"sub": "subject_id",
"client_id": "https://example.com/rp",
"response_type": ["code"],
"authzreq": "{}",
name,
sdb,
cdb,
None,
userinfo,
None,
client_authn,
None,
urlmap,
keyjar,
hostname,
verify_ssl=verify_ssl,
)
if keyjar is None:
keyjar = KeyJar(verify_ssl=verify_ssl)
for cid, _dic in cdb.items():
try:
keyjar.add_symmetric(cid, _dic["client_secret"], ["sig", "ver"])
except KeyError:
pass
self.srvmethod = OICCServer(keyjar=keyjar)
self.dist_claims_mode = dist_claims_mode
self.info_store = {} # type: Dict[str, Any]
self.claims_userinfo_endpoint = ""
}
}
USERDB = {
"username": {
"name": "Linda Lindgren",
"nickname": "Linda",
"email": "linda@example.com",
"verified": True,
"sub": "username"
}
}
PROVIDER_RSA = keybundle_from_local_file(
"/Users/regu0004/dev/pyoidc/tests/data/keys/cert.key",
"RSA", ["ver", "sig"])
PROVIDER_KEYJAR = KeyJar()
PROVIDER_KEYJAR[""] = PROVIDER_RSA
CLIENT_RSA = keybundle_from_local_file(
"/Users/regu0004/dev/pyoidc/tests/data/keys/rsa.key",
"RSA", ["ver", "sig"])
CLIENT_KEYJAR = KeyJar()
CLIENT_KEYJAR[""] = CLIENT_RSA
class DummyAuthn(UserAuthnMethod):
def __init__(self, srv, user):
UserAuthnMethod.__init__(self, srv)
self.user = user
def authenticated_as(self, cookie=None, **kwargs):
if cookie == "FAIL":
def __setitem__(self, key, value):
"""
:param key: issuer ID
:param value: Supposed to be KeyJar or a JWKS (JSON document)
"""
if not isinstance(value, KeyJar):
kj = KeyJar()
kj.import_jwks(value, issuer=key)
value = kj
else:
_val = value.copy()
_iss = list(_val.keys())
if _iss == ['']:
_val.issuer_keys[key] = _val.issuer_keys['']
del _val.issuer_keys['']
elif len(_iss) == 1:
if _iss[0] != key:
_val.issuer_keys[key] = _val.issuer_keys[_iss[0]]
del _val.issuer_keys[_iss[0]]
else:
raise ValueError('KeyJar contains to many issuers')
)
self.provider_info = pcr
else:
_pcr_issuer = issuer
self.issuer = _pcr_issuer
if endpoints:
for key, val in pcr.items():
if key.endswith("_endpoint"):
setattr(self, key, val)
if keys:
if self.keyjar is None:
self.keyjar = KeyJar()
self.keyjar.load_keys(pcr, _pcr_issuer)