Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def user_info_request(self, method="GET", state="", scope="", **kwargs):
uir = self.message_factory.get_request_type("userinfo_endpoint")()
logger.debug("[user_info_request]: kwargs:%s" % (sanitize(kwargs),))
token = None # type: Optional[Token]
if "token" in kwargs:
if kwargs["token"]:
uir["access_token"] = kwargs["token"]
token = Token()
token.token_type = "Bearer"
token.access_token = kwargs["token"]
kwargs["behavior"] = "use_authorization_header"
else:
# What to do ? Need a callback
pass
elif "access_token" in kwargs and kwargs["access_token"]:
uir["access_token"] = kwargs["access_token"]
del kwargs["access_token"]
elif state:
token = self.grant[state].get_token(scope)
"response_type": self.behaviour["response_type"],
"scope": self.behaviour["scope"],
"state": session["state"],
"redirect_uri": self.registration_response["redirect_uris"][0],
}
if self.oidc:
session["nonce"] = rndstr(32)
request_args["nonce"] = session["nonce"]
if acr_value is not None:
request_args["acr_values"] = acr_value
request_args.update(kwargs)
cis = self.construct_AuthorizationRequest(request_args=request_args)
logger.debug("request: %s" % sanitize(cis))
url, body, ht_args, cis = self.uri_and_body(
AuthorizationRequest, cis, method="GET", request_args=request_args
)
self.authz_req[request_args["state"]] = cis
logger.debug("body: %s" % sanitize(body))
logger.info("URL: %s" % sanitize(url))
logger.debug("ht_args: %s" % sanitize(ht_args))
resp = Redirect(str(url))
if ht_args:
resp.headers.extend([(a, b) for a, b in ht_args.items()])
logger.debug("resp_headers: %s" % sanitize(resp.headers))
return resp
perm_set = session.get("permission")
if perm_set:
uic = {key: uic[key] for key in uic if key in perm_set}
if "oidreq" in session:
uic = self.server.update_claims(session, "oidreq", "userinfo", uic)
else:
uic = self.server.update_claims(session, "authzreq", "userinfo", uic)
if uic:
userinfo_claims = Claims(**uic)
else:
userinfo_claims = None
logger.debug("userinfo_claim: %s" % sanitize(userinfo_claims.to_dict()))
logger.debug("Session info: %s" % sanitize(session))
if "authn_event" in session:
uid = AuthnEvent.from_json(session["authn_event"]).uid
else:
uid = session["uid"]
info = self.userinfo(uid, session["client_id"], userinfo_claims)
if "sub" in userinfo_claims:
if not claims_match(session["sub"], userinfo_claims["sub"]):
raise FailedAuthentication("Unmatched sub claim")
info["sub"] = session["sub"]
logger.debug("user_info_response: %s", info)
return info
def read_registration(self, authn, request, **kwargs):
"""
Read all information this server has on a client.
Authorization is done by using the access token that was return as
part of the client registration result.
:param authn: The Authorization HTTP header
:param request: The query part of the URL
:param kwargs: Any other arguments
:return:
"""
logger.debug("authn: %s, request: %s" % (sanitize(authn), sanitize(request)))
# verify the access token, has to be key into the client information
# database.
if not authn.startswith("Bearer "):
return error_response("invalid_request")
token = authn[len("Bearer ") :]
# Get client_id from request
_info = parse_qs(request)
cid = _info.get("client_id")
if cid is None:
return Unauthorized()
client_id = cid[0]
cdb_entry = self.cdb.get(client_id)
if cdb_entry is None:
if key in what:
try:
cpoints[_srv].append(key)
except KeyError:
cpoints[_srv] = [key]
try:
remaining.remove(key)
except ValueError:
pass
if remaining:
raise MissingAttribute("Missing properties '%s'" % remaining)
for srv, what in cpoints.items():
cc = self.oidcsrv.claims_clients[srv]
logger.debug("srv: %s, what: %s" % (sanitize(srv), sanitize(what)))
_res = self._collect_distributed(srv, cc, userid, what)
logger.debug("Got: %s" % sanitize(_res))
for key, val in _res.items():
if key in result:
result[key].update(val)
else:
result[key] = val
else:
# default is what "openid" demands which is sub
result = {"sub": userid}
return OpenIDSchema(**result)
atresp = self.do_access_token_request(
state=authresp["state"],
request_args=args,
authn_method=self.registration_response[
"token_endpoint_auth_method"
],
)
msg = "Access token response: {}"
logger.info(msg.format(sanitize(atresp)))
except Exception as err:
logger.error("%s" % err)
raise
if isinstance(atresp, ErrorResponse):
msg = "Error response: {}"
self._err(msg.format(sanitize(atresp.to_dict())))
_token = atresp["access_token"]
try:
_id_token = atresp["id_token"]
except KeyError:
pass
else:
_token = authresp["access_token"]
if not self.oidc:
return {"access_token": _token}
if _id_token is None:
self._err("Invalid response: no IdToken")
def _err(self, txt):
logger.error(sanitize(txt))
raise OAuth2Error(txt)
try:
_idtoken = self.sign_encrypt_id_token(
_info, client_info, areq, user_info=userinfo
)
except (JWEException, NoSuitableSigningKeys) as err:
logger.warning(str(err))
return error_response(
"invalid_request", descr="Could not sign/encrypt id_token"
)
_sdb.update_by_token(_access_code, "id_token", _idtoken)
# Refresh the _tinfo
_tinfo = _sdb[_access_code]
_log_debug("_tinfo: %s" % sanitize(_tinfo))
response_cls = self.server.message_factory.get_response_type("token_endpoint")
atr = response_cls(**by_schema(response_cls, **_tinfo))
logger.info("access_token_response: %s" % sanitize(atr.to_dict()))
return Response(
atr.to_json(), content="application/json", headers=OAUTH2_NOCACHE_HEADERS
)
pass
request_args.update(kwargs)
cis = self.construct_AuthorizationRequest(request_args=request_args)
logger.debug("request: %s" % sanitize(cis))
url, body, ht_args, cis = cast(
AuthorizationRequest,
self.uri_and_body(
AuthorizationRequest, cis, method="GET", request_args=request_args
),
)
self.authz_req[request_args["state"]] = cis
logger.debug("body: %s" % sanitize(body))
logger.info("URL: %s" % sanitize(url))
logger.debug("ht_args: %s" % sanitize(ht_args))
resp = Redirect(str(url))
if ht_args:
resp.headers.extend([(a, b) for a, b in ht_args.items()])
logger.debug("resp_headers: %s" % sanitize(resp.headers))
return resp
cookie: Optional[Union[str, SimpleCookie]] = None,
**kwargs
) -> Response:
"""
Handle a RP initiated Logout request.
:param request: The logout request
:param cookie:
:param kwargs:
:return: Returns a dictionary with one key 'sjwt' and the value
being a signed JWT token with session information.
"""
_req = self.server.message_factory.get_request_type("endsession_endpoint")
esr = _req().from_urlencoded(request)
logger.debug("End session request: %s", sanitize(esr.to_dict()))
if self.events:
self.events.store("protocol request", esr)
# 2 ways of find out client ID and user. Either through a cookie
# or using the id_token_hint. If I get information from both make sure they match
_, client_id, uid = self._get_uid_from_cookie(cookie)
if uid is not None:
client_ids = self.sdb.get_client_ids_for_uid(uid)
if client_id not in client_ids:
return error_response("invalid_request", "Wrong user")
sid = ""
if "id_token_hint" in esr: