Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testOidcCallbackSucceeds(self):
"""
Test that when the authorization provider calls back to us, we can
handle the reply correctly. This test is the 'succeeds' case.
"""
url = '/oauth2callback?scope=openid+profile&state={0}&code={1}'.format(
oic.oauth2.rndstr(0), OICCODE
)
with self.app.session_transaction() as sess:
sess['state'] = oic.oauth2.rndstr(0)
sess['nonce'] = oic.oauth2.rndstr(0)
result = self.app.get(url)
self.assertEqual(result.status_code, 302)
self.assertEqual(result.location, 'http://{0}:8001/'.format(
socket.gethostname()))
pass
else:
try:
session["op"] = query["op"][0]
try:
assert session["op"] in SERVER_ENV["OP"]
_op = session["op_conf"] = SERVER_ENV["OP"][session["op"]]
except AssertionError:
return BadRequest("OP chosen that is not configured")
except KeyError:
pass
if _op:
try:
client = SERVER_ENV["OIC_CLIENT"][session["opkey"]]
except KeyError:
_key = rndstr()
try:
kwargs = {"deviate": _op["deviate"]}
except KeyError:
kwargs = {}
client = create_client(_key, **kwargs)
session["opkey"] = _key
SERVER_ENV["OIC_CLIENT"][session["opkey"]] = client
if _op["features"]["discovery"]:
link = _op["provider"]["dynamic"]
session["srv_discovery_url"] = link
else:
client.handle_provider_config(_op["provider"], session["op"])
else:
resp = Redirect("/")
"pre-test", err)
conv.request_spec = req
if req_c == TEST_FLOWS.Discover:
# Special since it's just a GET on a URL
_r = req.discover(
ots.client, issuer=ots.config.CLIENT["srv_discovery_url"])
conv.position, conv.last_response, conv.last_content = _r
logging.debug("Provider info: %s" % conv.last_content._dict)
verify_support(conv, ots, session["graph"])
else:
LOGGER.info("request: %s" % req.request)
if req.request == "AuthorizationRequest":
# New state for each request
kwargs = {"request_args": {"state": rndstr()}}
elif req.request in ["AccessTokenRequest", "UserInfoRequest",
"RefreshAccessTokenRequest"]:
kwargs = {"state": conv.AuthorizationRequest["state"]}
else:
kwargs = {}
# Extra arguments outside the OIDC spec
try:
_extra = ots.config.CLIENT["extra"][req.request]
except KeyError:
pass
except Exception as err:
return err_response(environ, start_response, session,
"config_exta", err)
else:
try:
def __init__(self, conv, session, test_id, conf, funcs):
Request.__init__(self, conv, session, test_id, conf, funcs)
self.op_args["endpoint"] = conv.client.provider_info[
"authorization_endpoint"]
conv.state = rndstr()
self.req_args["state"] = conv.state
conv.nonce = rndstr()
self.req_args["nonce"] = conv.nonce
def testOidcCallbackBadNonce(self):
"""
Test that when the authorization provider calls back to us, we can
handle the reply correctly.
In this case, the nonce returned does not match that in the session.
"""
url = '/oauth2callback?scope=openid+profile&state={0}&code={1}'.format(
oic.oauth2.rndstr(0), OICCODE
)
with self.app as app:
with app.session_transaction() as sess:
sess['state'] = oic.oauth2.rndstr(0)
sess['nonce'] = 'other'
result = app.get(url)
self.assertEqual(result.status_code, 403)
template_dirs = settings["server"].get("template_dirs", "templates")
jinja_env = Environment(loader=FileSystemLoader(template_dirs))
authn_broker, auth_routing = setup_authentication_methods(
settings["authn"], jinja_env)
# Setup userinfo
userinfo_conf = settings["userinfo"]
cls = make_cls_from_name(userinfo_conf["class"])
i = cls(**userinfo_conf["kwargs"])
userinfo = UserInfo(i)
client_db = {}
provider = Provider(issuer, SessionDB(issuer), client_db, authn_broker,
userinfo, AuthzHandling(), verify_client, None)
provider.baseurl = issuer
provider.symkey = rndstr(16)
# Setup keys
path = os.path.join(os.path.dirname(__file__), "static")
try:
os.makedirs(path)
except OSError, e:
if e.errno != errno.EEXIST:
raise e
pass
jwks = keyjar_init(provider, settings["provider"]["keys"])
name = "jwks.json"
with open(os.path.join(path, name), "w") as f:
f.write(json.dumps(jwks))
provider.jwks_uri.append(
"{}/static/{}".format(provider.baseurl, name))
except KeyError:
server_env["OIC_CLIENT"] = {self.name: client}
logger.debug("Session: %s" % session)
logger.debug("Session_id: %s" % session["req_info"].message.id)
_state = copy.copy(session["req_info"].message.id)
request_args = {
"response_type": self.flow_type,
"scope": self.extra["scope"],
"state": _state,
}
client.state = _state
session["state"] = _state
if self.flow_type == "token":
request_args["nonce"] = rndstr(16)
session["nonce"] = request_args["nonce"]
else:
use_nonce = getattr(self, "use_nonce", None)
if use_nonce:
request_args["nonce"] = rndstr(16)
session["nonce"] = request_args["nonce"]
server_env["CACHE"][sid] = session
logger.info("client args: %s" % client.__dict__.items(),)
logger.info("request_args: %s" % (request_args,))
# User info claims
try:
cis = client.construct_AuthorizationRequest(
request_args=request_args)
except KeyError:
_log = create_session_logger(key)
OAS.trace_log[key] = _log
else:
_log = replace_format_handler(logger)
a1 = logging.LoggerAdapter(_log,
{'path': path,
'client': remote,
"cid": key})
except ValueError:
pass
if not a1:
key = STR + rndstr() + STR
handle = (key, 0)
if hasattr(OAS, "trace_log"):
try:
_log = OAS.trace_log[key]
except KeyError:
_log = OAS.new_trace_log(key)
else:
_log = replace_format_handler(logger)
a1 = logging.LoggerAdapter(_log, {'path': path, 'client': remote,
"cid": key})
#logger.info("handle:%s [%s]" % (handle, a1))
#a1.info(40*"-")
if path.startswith("static/"):
def startLogin():
"""
If we are not logged in, this generates the redirect URL to the OIDC
provider and returns the redirect response
:return: A redirect response to the OIDC provider
"""
flask.session["state"] = oic.oauth2.rndstr(SECRET_KEY_LENGTH)
flask.session["nonce"] = oic.oauth2.rndstr(SECRET_KEY_LENGTH)
args = {
"client_id": app.oidcClient.client_id,
"response_type": "code",
"scope": ["openid", "profile"],
"nonce": flask.session["nonce"],
"redirect_uri": app.oidcClient.redirect_uris[0],
"state": flask.session["state"]
}
result = app.oidcClient.do_authorization_request(
request_args=args, state=flask.session["state"])
return flask.redirect(result.url)
def provider_login(request):
if 'provider' in request.GET and len(request.GET['provider']) > 0:
provider = request.GET['provider']
else:
return HttpResponseBadRequest("Missing provider query parameter.")
if 'redirect_url' in request.GET and len(request.GET['redirect_url']) > 0:
request.session['redirect_url'] = request.GET['redirect_url']
elif 'HTTP_REFERER' in request.META:
request.session['redirect_url'] = request.META['HTTP_REFERER']
client = build_client(provider)
request.session['provider'] = provider
request.session['state'] = rndstr()
request.session['nonce'] = rndstr()
args = {
"client_id": client.client_id,
"response_type": "code",
"scope": ["openid", "email", "profile"],
"state": request.session['state'],
"nonce": request.session['nonce'],
"redirect_uri": client.redirect_uris[0]
}
authz_req = client.construct_AuthorizationRequest(request_args=args)
login_url = authz_req.request(client.authorization_endpoint)
return HttpResponseRedirect(login_url)