Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def make_client(**kw_args):
_cli = Client(client_authn_method=CLIENT_AUTHN_METHOD,
keyjar=kw_args["keyjar"])
_cli.kid = kw_args["kidd"]
_cli.jwks_uri = kw_args["jwks_uri"]
try:
_cli_info = kw_args["conf"].INFO["client"]
except KeyError:
pass
else:
for arg, val in _cli_info.items():
setattr(_cli, arg, val)
return _cli
def test_fetch_distributed_claims_with_no_callback():
client = Client(CLIENT_ID, client_authn_method=CLIENT_AUTHN_METHOD)
client.http_request = fake_request # type: ignore # FIXME: Replace with responses?
userinfo = {
"sub": "foobar",
"_claim_names": {"shoe_size": "src1"},
"_claim_sources": {
"src1": {"endpoint": "https://bank.example.com/claim_source"}
},
}
_ui = client.fetch_distributed_claims(userinfo, callback=None)
assert _ui["shoe_size"] == 10
assert _ui["sub"] == "foobar"
class OIDCError(Exception):
pass
class MissingErrorResponse(Exception):
pass
def flow2sequence(operations, item):
flow = operations.FLOWS[item]
return [operations.PHASES[phase] for phase in flow["sequence"]]
class Client(oic.Client):
def __init__(self, client_id=None, ca_certs=None,
client_prefs=None, client_authn_methods=None, keyjar=None,
verify_ssl=True, behaviour=None):
oic.Client.__init__(self, client_id, ca_certs, client_prefs,
client_authn_methods, keyjar, verify_ssl)
if behaviour:
self.behaviour = behaviour
class OIDCTestSetup(object):
def __init__(self, config, test_defs, port, client_cls=Client):
"""
:param config: Imported configuration module
:return:
"""
def _create_client(provider_metadata, client_metadata, verify_ssl=True):
"""
Create a pyoidc client instance.
:param provider_metadata: provider configuration information
:type provider_metadata: Mapping[str, Union[str, Sequence[str]]]
:param client_metadata: client metadata
:type client_metadata: Mapping[str, Union[str, Sequence[str]]]
:return: client instance to use for communicating with the configured provider
:rtype: oic.oic.Client
"""
client = oic.Client(
client_authn_method=CLIENT_AUTHN_METHOD, verify_ssl=verify_ssl
)
# Provider configuration information
if "authorization_endpoint" in provider_metadata:
# no dynamic discovery necessary
client.handle_provider_config(ProviderConfigurationResponse(**provider_metadata),
provider_metadata["issuer"])
else:
# do dynamic discovery
client.provider_config(provider_metadata["issuer"])
# Client information
if "client_id" in client_metadata:
# static client info provided
client.store_registration_info(RegistrationRequest(**client_metadata))
from oic.oic.message import OpenIDSchema
from oic.utils.authn.client import CLIENT_AUTHN_METHOD
from oic.utils.http_util import Redirect
from oic.utils.sanitize import sanitize
__author__ = "roland"
logger = logging.getLogger(__name__)
class OIDCError(Exception):
pass
class Client(oic.Client):
def __init__(
self,
client_id=None,
client_prefs=None,
client_authn_method=None,
keyjar=None,
verify_ssl=True,
behaviour=None,
config=None,
jwks_uri="",
kid=None,
):
oic.Client.__init__(
self,
client_id,
client_prefs,
raise exceptions.ConfigurationException(
"Unsupported data source scheme: " + dataSource.scheme)
theBackend = backend.Backend(dataRepository)
theBackend.setRequestValidation(app.config["REQUEST_VALIDATION"])
theBackend.setResponseValidation(app.config["RESPONSE_VALIDATION"])
theBackend.setDefaultPageSize(app.config["DEFAULT_PAGE_SIZE"])
theBackend.setMaxResponseLength(app.config["MAX_RESPONSE_LENGTH"])
app.backend = theBackend
app.secret_key = os.urandom(SECRET_KEY_LENGTH)
app.oidcClient = None
app.tokenMap = None
app.myPort = port
if "OIDC_PROVIDER" in app.config:
# The oic client. If we're testing, we don't want to verify
# SSL certificates
app.oidcClient = oic.oic.Client(
verify_ssl=('TESTING' not in app.config))
app.tokenMap = {}
try:
app.oidcClient.provider_config(app.config['OIDC_PROVIDER'])
except requests.exceptions.ConnectionError:
configResponse = message.ProviderConfigurationResponse(
issuer=app.config['OIDC_PROVIDER'],
authorization_endpoint=app.config['OIDC_AUTHZ_ENDPOINT'],
token_endpoint=app.config['OIDC_TOKEN_ENDPOINT'],
revocation_endpoint=app.config['OIDC_TOKEN_REV_ENDPOINT'])
app.oidcClient.handle_provider_config(configResponse,
app.config['OIDC_PROVIDER'])
# The redirect URI comes from the configuration.
# If we are testing, then we allow the automatic creation of a
# redirect uri if none is configured
from oic.utils.authn.client import CLIENT_AUTHN_METHOD
from oic.utils.keyio import keyjar_init
from six.moves.urllib.parse import urlparse
from six.moves.urllib.parse import urlencode
from signed_http_req import sign_http
__author__ = 'roland'
logger = logging.getLogger(__name__)
class OIDCError(Exception):
pass
class Client(oic.Client):
def __init__(self, client_id=None, ca_certs=None,
client_prefs=None, client_authn_method=None, keyjar=None,
verify_ssl=True, behaviour=None):
oic.Client.__init__(self, client_id, ca_certs, client_prefs,
client_authn_method, keyjar, verify_ssl)
if behaviour:
self.behaviour = behaviour
def create_authn_request(self, session, acr_value=None, **kwargs):
session["state"] = rndstr()
session["nonce"] = rndstr()
request_args = {
"response_type": self.behaviour["response_type"],
"scope": self.behaviour["scope"],
"state": session["state"],
"nonce": session["nonce"],
return Response(cresp.to_json(), content="application/json")
def claims_info_endpoint(self, request, authn):
_log_info = logger.info
_log_info("Claims_info_endpoint query: '%s'" % sanitize(request))
ucreq = self.srvmethod.parse_userinfo_claims_request(request)
# Access_token is mandatory in UserInfoClaimsRequest
uiresp = OpenIDSchema(**self.info_store[ucreq["access_token"]])
_log_info("returning: %s" % sanitize(uiresp.to_dict()))
return Response(uiresp.to_json(), content="application/json")
class ClaimsClient(Client):
def __init__(self, client_id=None, verify_ssl=True):
Client.__init__(self, client_id, verify_ssl=verify_ssl)
self.request2endpoint = REQUEST2ENDPOINT.copy()
self.request2endpoint["UserClaimsRequest"] = "userclaims_endpoint"
self.response2error = RESPONSE2ERROR.copy()
self.response2error["UserClaimsResponse"] = ["ErrorResponse"]
def construct_UserClaimsRequest(
self, request=UserClaimsRequest, request_args=None, extra_args=None, **kwargs
):
return self.construct_request(request, request_args, extra_args)
def do_claims_request(
def __init__(self, client_metadata):
self.client = OIDCClient(client_authn_method=CLIENT_AUTHN_METHOD)
def __init__(self, flask_app, client_registration_info=None, issuer=None,
provider_configuration_info=None, userinfo_endpoint_method='POST',
extra_request_args=None):
self.app = flask_app
self.userinfo_endpoint_method = userinfo_endpoint_method
self.extra_request_args = extra_request_args or {}
self.client = Client(client_authn_method=CLIENT_AUTHN_METHOD)
if not issuer and not provider_configuration_info:
raise ValueError(
'Either \'issuer\' (for dynamic discovery) or \'provider_configuration_info\' (for static configuration must be specified.')
if issuer and not provider_configuration_info:
self.client.provider_config(issuer)
else:
self.client.handle_provider_config(
ProviderConfigurationResponse(**provider_configuration_info),
provider_configuration_info['issuer'])
self.client_registration_info = client_registration_info or {}
# setup redirect_uri
self.app.add_url_rule('/redirect_uri', 'redirect_uri',
self._handle_authentication_response)
with self.app.app_context():