Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def auth_init(self, request):
"""Overriden since the filter_request can throw an InvalidRequest."""
try:
return super().auth_init(request)
except InvalidRequest as err:
return error_response("invalid_request", "%s" % err)
def delete_registration(self, authn, request, **kwargs):
"""
Delete the client info on server side.
:param authn: Authorization HTTP header
:param request: Query part of the request
:return: Response with updated client info
"""
return error_response(
"Unsupported operation",
descr="Deletion of the registration is not supported",
status_code=403,
)
algo = client_info["userinfo_signed_response_alg"]
except KeyError: # Fall back to default
algo = self.jwx_def["signing_alg"]["userinfo"]
if algo == "none":
key = [] # type: List[KEYS]
else:
if algo.startswith("HS"):
key = self.keyjar.get_signing_key(
alg2keytype(algo), client_info["client_id"], alg=algo
)
else:
# Use my key for signing
key = self.keyjar.get_signing_key(alg2keytype(algo), "", alg=algo)
if not key:
return error_response("invalid_request", descr="Missing signing key")
jinfo = userinfo.to_jwt(key, algo)
if "userinfo_encrypted_response_alg" in client_info:
# encrypt with clients public key
jinfo = self.encrypt(
jinfo, client_info, session["client_id"], "userinfo", "JWT"
)
return jinfo
RFC6749 section 4.1
"""
_sdb = self.sdb
_log_debug = logger.debug
client_info = self.cdb[str(areq["client_id"])]
try:
_access_code = areq["code"].replace(" ", "+")
except KeyError: # Missing code parameter - absolutely fatal
return error_response("invalid_request", descr="Missing code")
# assert that the code is valid
if self.sdb.is_revoked(_access_code):
return error_response("invalid_request", descr="Token is revoked")
# Session might not exist or _access_code malformed
try:
_info = _sdb[_access_code]
except KeyError:
return error_response("invalid_request", descr="Code is invalid")
# If redirect_uri was in the initial authorization request verify that it is here as well
# Mismatch would raise in oic.oauth2.provider.Provider.token_endpoint
if "redirect_uri" in _info and "redirect_uri" not in areq:
return error_response("invalid_request", descr="Missing redirect_uri")
_log_debug("All checks OK")
issue_refresh = False
permissions = _info.get("permission", ["offline_access"]) or ["offline_access"]
headers.append(cookie_header)
# Now about the response_mode. Should not be set if it's obvious
# from the response_type. Knows about 'query', 'fragment' and
# 'form_post'.
if "response_mode" in areq:
try:
resp = self.response_mode(
areq,
fragment_enc,
aresp=aresp,
redirect_uri=redirect_uri,
headers=headers,
)
except InvalidRequest as err:
return error_response("invalid_request", str(err))
else:
if resp is not None:
return resp
return aresp, headers, redirect_uri, fragment_enc
Read all information this server has on a client.
Authorization is done by using the access token that was return as
part of the client registration result.
:param authn: The Authorization HTTP header
:param request: The query part of the URL
:param kwargs: Any other arguments
:return:
"""
logger.debug("authn: %s, request: %s" % (sanitize(authn), sanitize(request)))
# verify the access token, has to be key into the client information
# database.
if not authn.startswith("Bearer "):
return error_response("invalid_request")
token = authn[len("Bearer ") :]
# Get client_id from request
_info = parse_qs(request)
cid = _info.get("client_id")
if cid is None:
return Unauthorized()
client_id = cid[0]
cdb_entry = self.cdb.get(client_id)
if cdb_entry is None:
return Unauthorized()
reg_token = cdb_entry.get("registration_access_token", "")
if not safe_str_cmp(reg_token, token):
return Unauthorized()
"invalid_token", descr="Invalid Token", status_code=401
)
_log_debug("access_token type: '%s'" % (typ,))
if typ != "T":
logger.error("Wrong token type: {}".format(typ))
raise FailedAuthentication("Wrong type of token")
if _sdb.access_token.is_expired(token):
return error_response(
"invalid_token", descr="Token is expired", status_code=401
)
if _sdb.is_revoked(key):
return error_response(
"invalid_token", descr="Token is revoked", status_code=401
)
session = _sdb[key]
# Scope can translate to userinfo_claims
info = self.schema(**self._collect_user_info(session))
# Should I return a JSON or a JWT ?
_cinfo = self.cdb.get(session["client_id"])
if _cinfo is None:
return error_response("unauthorized_client", descr="Unknown client")
try:
if "userinfo_signed_response_alg" in _cinfo:
# Will also encrypt if defined in cinfo
jinfo = self.signed_userinfo(_cinfo, info, session)
return error_response("invalid_request", descr="Missing redirect_uri")
_log_debug("All checks OK")
issue_refresh = False
permissions = _info.get("permission", ["offline_access"]) or ["offline_access"]
if "offline_access" in _info["scope"] and "offline_access" in permissions:
issue_refresh = True
try:
_tinfo = _sdb.upgrade_to_token(_access_code, issue_refresh=issue_refresh)
except AccessCodeUsed as err:
logger.error("%s" % err)
# Should revoke the token issued to this access code
_sdb.revoke_all_tokens(_access_code)
return error_response("access_denied", descr="Access Code already used")
if "openid" in _info["scope"]:
userinfo = self.userinfo_in_id_token_claims(_info)
try:
_idtoken = self.sign_encrypt_id_token(
_info, client_info, areq, user_info=userinfo
)
except (JWEException, NoSuitableSigningKeys) as err:
logger.warning(str(err))
return error_response(
"invalid_request", descr="Could not sign/encrypt id_token"
)
_sdb.update_by_token(_access_code, "id_token", _idtoken)
# Refresh the _tinfo
def client_registration_setup(self, request):
try:
request.verify()
except MessageException as err:
if "type" not in request:
return error_response("invalid_type", descr="%s" % err)
else:
return error_response(
"invalid_configuration_parameter", descr="%s" % err
)
request.rm_blanks()
try:
self.match_client_request(request)
except CapabilitiesMisMatch as err:
return error_response(
"invalid_request", descr="Don't support proposed %s" % err
)
# create new id och secret
client_id = rndstr(12)
while client_id in self.cdb:
except KeyError:
# If no response_type is registered by the client then we'll
# code which it the default according to the OIDC spec.
_registered = [{"code"}]
_wanted = set(areq["response_type"])
if _wanted not in _registered:
return error_response(
"invalid_request", "Trying to use unregistered response_typ"
)
logger.debug("AuthzRequest: %s" % (sanitize(areq.to_dict()),))
try:
redirect_uri = self.get_redirect_uri(areq)
except (RedirectURIError, ParameterError, UnknownClient) as err:
return error_response(
"invalid_request", "{}:{}".format(err.__class__.__name__, err)
)
try:
keyjar = self.keyjar
except AttributeError:
keyjar = ""
try:
# verify that the request message is correct
areq.verify(keyjar=keyjar, opponent_id=areq["client_id"])
except (MissingRequiredAttribute, ValueError, MissingRequiredValue) as err:
return redirect_authz_error("invalid_request", redirect_uri, "%s" % err)
return {"areq": areq, "redirect_uri": redirect_uri}