Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
key = SYMKey(key="TestPassword")
_signed_jwt = idts.to_jwt(key=[key], algorithm="HS256")
# Mess with the signed id_token
p = _signed_jwt.split(".")
p[2] = "aaa"
_faulty_signed_jwt = ".".join(p)
_info = {
"access_token": "accessTok",
"id_token": _faulty_signed_jwt,
"token_type": "Bearer",
"expires_in": 3600,
}
at = AccessTokenResponse(**_info)
with pytest.raises(BadSignature):
at.verify(key=[key])
idval = {'nonce': 'KUEYfRM2VzKDaaKD', 'sub': 'EndUserSubject',
'iss': 'https://alpha.cloud.nds.rub.de', 'exp': 1420823073,
'iat': 1420822473, 'aud': 'TestClient'}
idts = IdToken(**idval)
key = SYMKey(key="TestPassword")
_signed_jwt = idts.to_jwt(key=[key], algorithm="HS256")
# Mess with the signed id_token
p = _signed_jwt.split(".")
p[2] = "aaa"
_faulty_signed_jwt = ".".join(p)
_info = {"access_token": "accessTok", "id_token": _faulty_signed_jwt,
"token_type": "Bearer", "expires_in": 3600}
at = AccessTokenResponse(**_info)
with pytest.raises(BadSignature):
at.verify(key=[key])
def test_faulty_id_token_in_access_token_response(self):
c = Consumer(None, None)
c.keyjar.add_symmetric("", "TestPassword", ["sig"])
_info = {
"access_token": "accessTok",
"id_token": self._faulty_id_token(),
"token_type": "Bearer",
}
_json = json.dumps(_info)
with pytest.raises(ValueError):
c.parse_response(AccessTokenResponse, _json, sformat="json")
def test_construct_UserInfoRequest_2_with_token(self):
self.client.grant["foo"] = Grant()
self.client.grant["foo"].grant_expiration_time = int(time.time() + 60)
self.client.grant["foo"].code = "access_code"
resp = AccessTokenResponse(
refresh_token="refresh_with_me",
access_token="access",
id_token="IDTOKEN",
scope=["openid"],
)
self.client.grant["foo"].tokens.append(Token(resp))
uir = self.client.construct_UserInfoRequest(state="foo", scope=["openid"])
assert uir["access_token"] == "access"
http_response = frontend_with_extra_scopes.handle_authn_response(
context, internal_response
)
authn_resp = AuthorizationResponse().deserialize(urlparse(http_response.message).fragment, "urlencoded")
assert "code" in authn_resp
assert "access_token" in authn_resp
assert "id_token" in authn_resp
# token request
context.request = AccessTokenRequest(redirect_uri=authn_req["redirect_uri"], code=authn_resp["code"]).to_dict()
credentials = "{}:{}".format(registration_response["client_id"], registration_response["client_secret"])
basic_auth = urlsafe_b64encode(credentials.encode("utf-8")).decode("utf-8")
context.request_authorization = "Basic {}".format(basic_auth)
http_response = frontend_with_extra_scopes.token_endpoint(context)
parsed = AccessTokenResponse().deserialize(http_response.message, "json")
assert "access_token" in parsed
assert "id_token" in parsed
# userinfo request
context.request = {}
context.request_authorization = "Bearer {}".format(parsed["access_token"])
http_response = frontend_with_extra_scopes.userinfo_endpoint(context)
parsed = OpenIDSchema().deserialize(http_response.message, "json")
assert "email" in parsed
assert "eduperson_principal_name" in parsed
assert "eduperson_scoped_affiliation" in parsed
]
)
try:
del _dict["refresh_token"]
except KeyError:
pass
if "id_token" in req["response_type"]:
_idt = self.make_id_token(
_info, issuer=self.name, access_token=_dict["access_token"]
)
alg = "RS256"
ckey = self.keyjar.get_signing_key(alg2keytype(alg), _info["client_id"])
_dict["id_token"] = _idt.to_jwt(key=ckey, algorithm=alg)
resp = AccessTokenResponse(**_dict)
location = resp.request(req["redirect_uri"])
response = Response()
response.headers = {"location": location}
response.status_code = 302
response.text = ""
return response
def get_id_tokens(conv):
res = []
# In access token responses
for inst, msg in get_protocol_response(conv, message.AccessTokenResponse):
_dict = json.loads(msg)
jwt = _dict["id_token"]
idt = inst["id_token"]
res.append((idt, jwt))
# implicit, id_token in authorization response
for inst, msg in get_protocol_response(conv, message.AuthorizationResponse):
try:
idt = inst["id_token"]
except KeyError:
pass
else:
_info = urlparse.parse_qs(msg)
jwt = _info["id_token"][0]
res.append((idt, jwt))
self.name = name
for param in ["client_id", "client_secret"]:
try:
setattr(self, param, kwargs[param])
del kwargs[param]
except KeyError:
setattr(self, param, "")
self.extra = kwargs
try:
self.srv_discovery_url = kwargs["srv_discovery_url"]
except KeyError:
self.srv_discovery_url = None
self.flow_type = FLOW_TYPE
self.access_token_response = AccessTokenResponse
self.client_cls = oic.Client
self.authn_method = None
self.registration_info = registration_info
self.client_secret = ""
for param in ["client_id", "client_secret"]:
try:
setattr(self, param, kwargs[param])
del kwargs[param]
except KeyError:
pass
self.extra = kwargs
try:
self.srv_discovery_url = kwargs["srv_discovery_url"]
except KeyError:
self.srv_discovery_url = None
self.flow_type = FLOW_TYPE
self.access_token_response = AccessTokenResponse
self.client_cls = oic.Client
self.authn_method = None
self.registration_info = registration_info