Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_omit(self):
err = ErrorResponse(
error="invalid_request",
error_description="Something was missing",
error_uri="http://example.com/error_message.html",
)
ue_str = err.to_urlencoded()
del err["error_uri"]
ueo_str = err.to_urlencoded()
assert ue_str != ueo_str
assert "error_message" not in ueo_str
assert "error_message" in ue_str
:param response_type:
:param state:
:param kwargs:
:return:
"""
conv.position = url
conv.last_response = response
conv.last_content = response.content
trace.reply("STATUS: %d" % response.status_code)
_response = None
if response.status_code >= 400: # an error
if response.text:
try:
_response = ErrorResponse().from_json(response.text)
except (MessageException, ValueError):
trace.reply("Non OIDC error message: %s" % response.content)
else:
raise MissingErrorResponse()
elif response.status_code == 204: # No response
_response = Message()
else:
try:
uiendp = client.provider_info["userinfo_endpoint"]
except KeyError:
uiendp = ""
if uiendp == url:
_iss = client.provider_info["issuer"]
_ver_keys = client.keyjar.get("ver", issuer=_iss)
_info = [(k.kid, k.kty) for k in _ver_keys]
session["authentication"] = "FAILED"
return False, "Access denied", session
#session.session_id = msg["state"]
logger.debug("callback environ: %s" % environ)
if self.flow_type == "code":
# get the access token
try:
tokenresp = self.get_accesstoken(client, authresp)
except Exception, err:
logger.error("%s" % err)
raise
if isinstance(tokenresp, ErrorResponse):
return (False, "Invalid response %s." % tokenresp["error"],
session)
access_token = tokenresp["access_token"]
else:
access_token = authresp["access_token"]
userinfo = self.verify_token(client, access_token)
inforesp = self.get_userinfo(client, authresp, access_token)
if isinstance(inforesp, ErrorResponse):
return False, "Invalid response %s." % inforesp["error"], session
tot_info = userinfo.update(inforesp.to_dict())
)
if response:
if body_type == "txt":
# no meaning trying to parse unstructured text
return reqresp.text
return self.parse_response(
response, reqresp.text, body_type, state, **kwargs
)
# could be an error response
if reqresp.status_code in [200, 400, 401]:
if body_type == "txt":
body_type = "urlencoded"
try:
err = ErrorResponse().deserialize(reqresp.message, method=body_type)
try:
err.verify()
except PyoidcError:
pass
else:
return err
except Exception:
logger.exception(
"Failed to decode error response (%d) %s",
reqresp.status_code,
sanitize(reqresp.text),
)
return reqresp
class ClientInfoResponse(RegistrationRequest):
c_param = RegistrationRequest.c_param.copy()
c_param.update(
{
"client_id": SINGLE_REQUIRED_STRING,
"client_secret": SINGLE_OPTIONAL_STRING,
"client_id_issued_at": SINGLE_OPTIONAL_INT,
"client_secret_expires_at": SINGLE_OPTIONAL_INT,
"registration_access_token": SINGLE_REQUIRED_STRING,
"registration_client_uri": SINGLE_REQUIRED_STRING,
}
)
class ClientRegistrationError(ErrorResponse):
c_param = ErrorResponse.c_param.copy()
c_param.update({"state": SINGLE_OPTIONAL_STRING})
c_allowed_values = ErrorResponse.c_allowed_values.copy()
c_allowed_values.update(
{
"error": [
"invalid_redirect_uri",
"invalid_client_metadata",
"invalid_client_id",
]
}
)
class ClientUpdateRequest(RegistrationRequest):
c_param = RegistrationRequest.c_param.copy()
c_param.update(
has_reg_uri = "registration_client_uri" in self
has_reg_at = "registration_access_token" in self
if has_reg_uri != has_reg_at:
raise VerificationError(
(
"Only one of registration_client_uri"
" and registration_access_token present"
),
self,
)
return True
class ClientRegistrationErrorResponse(message.ErrorResponse):
c_allowed_values = {
"error": [
"invalid_redirect_uri",
"invalid_client_metadata",
"invalid_configuration_parameter",
]
}
class IdToken(OpenIDSchema):
c_param = OpenIDSchema.c_param.copy()
c_param.update(
{
"iss": SINGLE_REQUIRED_STRING,
"sub": SINGLE_REQUIRED_STRING,
"aud": REQUIRED_LIST_OF_STRINGS, # Array of strings or string
req_args = {
"redirect_uri": callback,
"client_secret": self.client_secret,
}
client.token_endpoint = self.extra["token_endpoint"]
tokenresp = client.do_access_token_request(
scope=self._scope,
body_type=self.token_response_body_type,
request_args=req_args,
authn_method="client_secret_post",
state=response["state"],
response_cls=self.access_token_response)
if isinstance(tokenresp, ErrorResponse):
logger.info("%s" % tokenresp)
return (False, "Authentication failed or permission not granted")
# Download the user profile and cache a local instance of the
# basic profile info
result = client.fetch_protected_resource(
self.userinfo_endpoint(tokenresp), token=tokenresp["access_token"])
logger.info("Userinfo: %s" % result.text)
profile = json.loads(result.text)
return True, profile, tokenresp["access_token"], client
if resp.status_code == 200:
if "application/json" in resp.headers["content-type"]:
sformat = "json"
elif "application/jwt" in resp.headers["content-type"]:
sformat = "jwt"
else:
raise PyoidcError(
"ERROR: Unexpected content-type: %s" % resp.headers["content-type"]
)
elif resp.status_code == 500:
raise PyoidcError("ERROR: Something went wrong: %s" % resp.text)
elif 400 <= resp.status_code < 500:
# the response text might be a OIDC message
try:
res = ErrorResponse().from_json(resp.text)
except Exception:
raise RequestError(resp.text)
else:
self.store_response(res, resp.text)
return res
else:
raise PyoidcError(
"ERROR: Something went wrong [%s]: %s" % (resp.status_code, resp.text)
)
try:
_schema = kwargs["user_info_schema"]
except KeyError:
_schema = OpenIDSchema
logger.debug("Reponse text: '%s'" % sanitize(resp.text))
def phaseN(self, environ, info, server_env, sid):
"""Step 2: Once the consumer has redirected the user back to the
callback URL you can request the access token the user has
approved."""
client = server_env["OIC_CLIENT"][self.name]
logger.debug("info: %s" % info)
logger.debug("keyjar: %s" % client.keyjar)
authresp = client.parse_response(AuthorizationResponse, info,
sformat="dict")
session = server_env["CACHE"][sid]
if isinstance(authresp, ErrorResponse):
session["authentication"] = "FAILED"
return False, "Access denied", session
#session.session_id = msg["state"]
logger.debug("callback environ: %s" % environ)
if self.flow_type == "code":
# get the access token
try:
tokenresp = self.get_accesstoken(client, authresp)
except Exception, err:
logger.error("%s" % err)
raise
if isinstance(tokenresp, ErrorResponse):