Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def static(environ, session, path):
logger.info("[static]sending: %s" % (path,))
try:
text = open(path).read()
ext = path.rsplit(".", 1)[-1]
try:
ctype = CTYPE_MAP[ext]
except KeyError:
ctype = CTYPE_MAP["txt"]
return Response(text, headers=[('Content-Type', ctype)])
except IOError:
return NotFound()
def flow_list(self, session):
resp = Response(mako_template="flowlist.mako",
template_lookup=self.lookup,
headers=[])
try:
_tid = session["testid"]
except KeyError:
_tid = None
self.dump_log(session, _tid)
argv = {
"flows": session["tests"],
"profile": session["profile"],
"test_info": session["test_info"].keys(),
"base": self.conf.BASE,
"headlines": self.test_flows.DESC,
"subject": "https://haho0034@hashog.umdc.umu.se:8999"
}']
:param environ: WSGI enviroment.
:param start_response: WSGI start response.
:return: oic.utils.http_util.Response object.
"""
query = parse_qs(environ["QUERY_STRING"])
try:
assert query["rel"] == [OIC_ISSUER]
resource = query["resource"][0]
except KeyError:
resp = http_util.BadRequest("Missing parameter in request.")
else:
wf = WebFinger()
resp = http_util.Response(wf.response(subject=resource, base=self.provider.baseurl))
return resp(environ, start_response)
_log_info("Exception err: %s" % err)
raise
else:
pass
_log_info("AU: %s" % aresp)
_log_info("AT: %s" % atr)
_log_info("DUMP: %s" % (_cli.sdb[_cli.state],))
_log_info("[2] %s" % (_cli.__dict__,))
# Valid for 6 hours (=360 minutes)
kaka = http_util.cookie(_conc["name"], _cli.state, _cli.seed, expire=360,
path="/")
resp = http_util.Response("Your will is registered", headers=[kaka])
_log_info("Cookie: %s" % (kaka,))
return resp(environ, start_response)
jinfo = self.encrypt(
jinfo, _cinfo, session["client_id"], "userinfo", ""
)
content_type = "application/jwt"
else:
jinfo = info.to_json()
content_type = "application/json"
except NotSupportedAlgorithm as err:
return error_response(
"invalid_request",
descr="Not supported algorithm: {}".format(err.args[0]),
)
except JWEException:
return error_response("invalid_request", descr="Could not encrypt")
return Response(jinfo, content=content_type)
try:
self.get_redirect_uri(areq)
except (RedirectURIError, ParameterError) as err:
return error_response("invalid_request", "%s" % err)
except Exception as err:
message = traceback.format_exception(*sys.exc_info())
logger.error(message)
logger.debug("Bad request: %s (%s)" % (err, err.__class__.__name__))
error = ErrorResponse(error="invalid_request", error_description=str(err))
return BadRequest(error.to_json(), content="application/json")
if not areq:
logger.debug("No AuthzRequest")
return error_response("invalid_request", "Can not parse AuthzRequest")
if isinstance(areq, Response):
return areq
areq = self.filter_request(areq)
if self.events:
self.events.store("Protocol request", areq)
try:
_cinfo = self.cdb[areq["client_id"]]
except KeyError:
logger.error(
"Client ID ({}) not in client database".format(areq["client_id"])
)
return error_response("unauthorized_client", "unknown client")
else:
try:
def jqauthz(environ, start_response, path):
resp = http_util.Response(mako_template="jqa.mako",
template_lookup=environ["mako.lookup"])
return resp(environ, start_response)
"id_token_hint": id_token_as_signed_jwt(client, _idtoken,
"HS256")})
# Also append the ACR values
logout_url += "&" + urlparse.urlencode({"acr_values": ACR_VALUES},
True)
LOGGER.debug("Logout URL: %s" % str(logout_url))
LOGGER.debug("Logging out from session: %s" % str(session))
session.delete()
resp = Redirect(str(logout_url))
return resp(environ, start_response)
elif path == "logout_success": # post_logout_redirect_uri
return Response("Logout successful!")(environ, start_response)
elif path == "session_iframe": # session management
kwargs = session["session_management"]
resp = Response(mako_template="rp_session_iframe.mako",
template_lookup=LOOKUP)
return resp(environ, start_response,
session_change_url="{}session_change".format(
SERVER_ENV["base_url"]),
**kwargs)
elif path == "session_change":
try:
client = CLIENTS[session["op"]]
except KeyError:
return Response("No valid session.")(environ, start_response)
kwargs = {"prompt": "none"}
# If there is an ID token send it along as a id_token_hint
idt = get_id_token(client, session)
if idt:
kwargs["id_token_hint"] = id_token_as_signed_jwt(client, idt,
request = parse_qs(get_or_post(environ))
_cli = CONSUMER[unquote(request["iss"][0])]
session["client"] = _cli
resp = SeeOther(_cli.begin(RP_CONF.BASE, path))
return resp(environ, start_response)
if path == "authz_cb":
_cli = session["client"]
request = get_or_post(environ)
aresp = _cli.handle_authorization_response(request)
rargs = {"code": aresp["code"]}
atresp = _cli.do_access_token_request(request_args=rargs)
#extra_args=None, http_args=None,)
# Access token should be stored somewhere for later usage
Token[atresp["state"]] = atresp
resp = Response("Got access token: %s" % atresp["access_token"])
return resp(environ, start_response)
return as_choice(environ, start_response)