Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class DummyMessage(Message):
c_param = {
"req_str": SINGLE_REQUIRED_STRING,
"opt_str": SINGLE_OPTIONAL_STRING,
"opt_int": SINGLE_OPTIONAL_INT,
"opt_str_list": OPTIONAL_LIST_OF_STRINGS,
"req_str_list": REQUIRED_LIST_OF_STRINGS,
"opt_json": SINGLE_OPTIONAL_JSON,
}
class StarMessage(Message):
c_param = {"*": SINGLE_REQUIRED_STRING}
class MessageListMessage(Message):
c_param = {"opt_message_list": ParamDefinition([Message], False, None, None, False)}
class TestMessage(object):
def test_json_serialization(self):
item = DummyMessage(
req_str="Fair",
opt_str="game",
opt_int=9,
opt_str_list=["one", "two"],
req_str_list=["spike", "lee"],
opt_json='{"ford": "green"}',
)
jso = item.serialize(method="json")
item2 = DummyMessage().deserialize(jso, "json")
conv.last_response = response
conv.last_content = response.content
trace.reply("STATUS: %d" % response.status_code)
_response = None
if response.status_code >= 400: # an error
if response.text:
try:
_response = ErrorResponse().from_json(response.text)
except (MessageException, ValueError):
trace.reply("Non OIDC error message: %s" % response.content)
else:
raise MissingErrorResponse()
elif response.status_code == 204: # No response
_response = Message()
else:
try:
uiendp = client.provider_info["userinfo_endpoint"]
except KeyError:
uiendp = ""
if uiendp == url:
_iss = client.provider_info["issuer"]
_ver_keys = client.keyjar.get("ver", issuer=_iss)
_info = [(k.kid, k.kty) for k in _ver_keys]
trace.info("Available verification keys: {}".format(_info))
kwargs["key"] = _ver_keys
_dec_keys = client.keyjar.get("enc", issuer="")
_info = [(k.kid, k.kty) for k in _dec_keys]
trace.info("Available decryption keys: {}".format(_info))
kwargs["key"].extend(_dec_keys)
_jwt = JWT(keyjar, msgtype=SoftwareStatement, iss=iss, **args)
return _jwt.pack(**kwargs)
def unpack_software_statement(software_statement, iss, keyjar):
_jwt = JWT(keyjar, iss=iss, msgtype=SoftwareStatement)
return _jwt.unpack(software_statement)
class ExtensionMessageFactory(OauthMessageFactory):
"""Message factory for Extension code."""
introspection_endpoint = MessageTuple(
TokenIntrospectionRequest, TokenIntrospectionResponse
)
revocation_endpoint = MessageTuple(TokenRevocationRequest, Message)
registration_endpoint = MessageTuple(RegistrationRequest, ClientInfoResponse)
update_endpoint = MessageTuple(ClientUpdateRequest, ClientInfoResponse)
delete_endpoint = MessageTuple(ClientUpdateRequest, ClientInfoResponse)
"client_id": SINGLE_REQUIRED_STRING,
"client_secret": SINGLE_REQUIRED_STRING,
"claims_names": REQUIRED_LIST_OF_STRINGS,
}
class UserClaimsResponse(Message):
c_param = {
"claims_names": REQUIRED_LIST_OF_STRINGS,
"jwt": SINGLE_OPTIONAL_STRING,
"endpoint": SINGLE_OPTIONAL_STRING,
"access_token": SINGLE_OPTIONAL_STRING,
}
class UserInfoClaimsRequest(Message):
c_param = {"access_token": SINGLE_REQUIRED_STRING}
class OICCServer(OicServer):
def parse_user_claims_request(self, info, sformat="urlencoded"):
return self._parse_request(UserClaimsRequest, info, sformat)
def parse_userinfo_claims_request(self, info, sformat="urlencoded"):
return self._parse_request(UserInfoClaimsRequest, info, sformat)
class ClaimsServer(Provider):
def __init__(
self,
name,
sdb,
from oic.oauth2.message import SINGLE_OPTIONAL_STRING
from oic.oauth2.message import SINGLE_REQUIRED_STRING
from oic.oauth2.message import ErrorResponse
from oic.oauth2.message import Message
from oic.oauth2.message import MessageTuple
from oic.oauth2.message import OauthMessageFactory
from oic.oauth2.message import ParamDefinition
from oic.oic.message import JasonWebToken
from oic.utils.http_util import SUCCESSFUL
from oic.utils.jwt import JWT
__author__ = "roland"
# RFC 7662
class TokenIntrospectionRequest(Message):
c_param = {
"token": SINGLE_REQUIRED_STRING,
"token_type_hint": SINGLE_OPTIONAL_STRING,
# The ones below are part of authentication information
"client_id": SINGLE_OPTIONAL_STRING,
"client_assertion_type": SINGLE_OPTIONAL_STRING,
"client_assertion": SINGLE_OPTIONAL_STRING,
}
SINGLE_REQUIRED_BOOLEAN = ParamDefinition(bool, True, None, None, False)
class TokenIntrospectionResponse(Message):
c_param = {
"active": SINGLE_REQUIRED_BOOLEAN,
SINGLE_OPTIONAL_JWT = ParamDefinition(Message, False, msg_ser, jwt_deser, False)
class UserInfoErrorResponse(message.ErrorResponse):
c_allowed_values = {
"error": [
"invalid_schema",
"invalid_request",
"invalid_token",
"insufficient_scope",
]
}
class DiscoveryRequest(Message):
c_param = {"principal": SINGLE_REQUIRED_STRING, "service": SINGLE_REQUIRED_STRING}
class DiscoveryResponse(Message):
c_param = {"locations": REQUIRED_LIST_OF_STRINGS}
class ResourceRequest(Message):
c_param = {"access_token": SINGLE_OPTIONAL_STRING}
SCOPE2CLAIMS = {
"openid": ["sub"],
"profile": [
"name",
"given_name",
class Claims(Message):
pass
class ClaimsRequest(Message):
c_param = {
"userinfo": OPTIONAL_MULTIPLE_Claims,
"id_token": OPTIONAL_MULTIPLE_Claims,
}
class OpenIDRequest(AuthorizationRequest):
pass
class ProviderConfigurationResponse(Message):
c_param = {
"issuer": SINGLE_REQUIRED_STRING,
"authorization_endpoint": SINGLE_REQUIRED_STRING,
"token_endpoint": SINGLE_OPTIONAL_STRING,
"userinfo_endpoint": SINGLE_OPTIONAL_STRING,
"jwks_uri": SINGLE_REQUIRED_STRING,
"registration_endpoint": SINGLE_OPTIONAL_STRING,
"scopes_supported": OPTIONAL_LIST_OF_STRINGS,
"response_types_supported": REQUIRED_LIST_OF_STRINGS,
"response_modes_supported": OPTIONAL_LIST_OF_STRINGS,
"grant_types_supported": OPTIONAL_LIST_OF_STRINGS,
"acr_values_supported": OPTIONAL_LIST_OF_STRINGS,
"subject_types_supported": REQUIRED_LIST_OF_STRINGS,
"id_token_signing_alg_values_supported": REQUIRED_LIST_OF_STRINGS,
"id_token_encryption_alg_values_supported": OPTIONAL_LIST_OF_STRINGS,
"id_token_encryption_enc_values_supported": OPTIONAL_LIST_OF_STRINGS,
super(RefreshSessionRequest, self).verify(**kwargs)
if "id_token" in self:
self["id_token"] = verify_id_token(self, check_hash=True, **kwargs)
class RefreshSessionResponse(StateFullMessage):
c_param = StateFullMessage.c_param.copy()
c_param.update({"id_token": SINGLE_REQUIRED_STRING})
def verify(self, **kwargs):
super(RefreshSessionResponse, self).verify(**kwargs)
if "id_token" in self:
self["id_token"] = verify_id_token(self, check_hash=True, **kwargs)
class CheckSessionRequest(Message):
c_param = {"id_token": SINGLE_REQUIRED_STRING}
def verify(self, **kwargs):
super(CheckSessionRequest, self).verify(**kwargs)
if "id_token" in self:
self["id_token"] = verify_id_token(self, check_hash=True, **kwargs)
class CheckIDRequest(Message):
c_param = {"access_token": SINGLE_REQUIRED_STRING}
class EndSessionRequest(Message):
c_param = {
"id_token_hint": SINGLE_OPTIONAL_STRING,
"post_logout_redirect_uri": SINGLE_OPTIONAL_STRING,
def factory(msgtype):
for _, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, Message):
try:
if obj.__name__ == msgtype:
return obj
except AttributeError:
pass
# check among standard OAuth2 messages
from oic.oauth2 import message
return message.factory(msgtype)
"scope": OPTIONAL_LIST_OF_SP_SEP_STRINGS,
"client_id": SINGLE_OPTIONAL_STRING,
"username": SINGLE_OPTIONAL_STRING,
"token_type": SINGLE_OPTIONAL_STRING,
"exp": SINGLE_OPTIONAL_INT,
"iat": SINGLE_OPTIONAL_INT,
"nbf": SINGLE_OPTIONAL_INT,
"sub": SINGLE_OPTIONAL_STRING,
"aud": OPTIONAL_LIST_OF_STRINGS,
"iss": SINGLE_OPTIONAL_STRING,
"jti": SINGLE_OPTIONAL_STRING,
}
# RFC 7009
class TokenRevocationRequest(Message):
c_param = {
"token": SINGLE_REQUIRED_STRING,
"token_type_hint": SINGLE_OPTIONAL_STRING,
"client_id": SINGLE_OPTIONAL_STRING,
"client_assertion_type": SINGLE_OPTIONAL_STRING,
"client_assertion": SINGLE_OPTIONAL_STRING,
}
class SoftwareStatement(JasonWebToken):
c_param = JasonWebToken.c_param.copy()
c_param.update(
{
"software_id": SINGLE_OPTIONAL_STRING,
"client_name": SINGLE_OPTIONAL_STRING,
"client_uri": SINGLE_OPTIONAL_STRING,