Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def response(self, binding, http_args, query):
cookie = self.create_cookie(
'{"'
+ self.CONST_QUERY
+ '": "'
+ base64.b64encode(query.encode("ascii")).decode("ascii")
+ '" , "'
+ self.CONST_HASIDP
+ '": "True" }',
self.CONST_SAML_COOKIE,
self.CONST_SAML_COOKIE,
)
if binding == BINDING_HTTP_ARTIFACT:
resp = SeeOther() # type: Response
elif binding == BINDING_HTTP_REDIRECT:
for param, value in http_args["headers"]:
if param == "Location":
resp = SeeOther(str(value), headers=[cookie])
break
else:
raise ServiceErrorException("Parameter error")
else:
http_args["headers"].append(cookie)
resp = Response(http_args["data"], headers=http_args["headers"])
return resp
if not completed:
return val(environ, start_response)
if val:
set_cookie, cookie_value = verifier.create_cookie(val, "auth")
cookie_value += "; path=/"
url = "{base_url}?{query_string}".format(
base_url="/authorization",
query_string=kwargs["state"]["query"])
response = SeeOther(url, headers=[(set_cookie, cookie_value)])
return response(environ, start_response)
else: # Unsuccessful authentication
url = "{base_url}?{query_string}".format(
base_url="/authorization",
query_string=kwargs["state"]["query"])
response = SeeOther(url)
return response(environ, start_response)
service_url = urlencode({self.CONST_SERVICE: self.get_service_url(nonce, acr)})
cas_url = self.cas_server + self.CONST_CASLOGIN + service_url
cookie = self.create_cookie(
'{"'
+ self.CONST_NONCE
+ '": "'
+ base64.b64encode(nonce).decode()
+ '", "'
+ self.CONST_QUERY
+ '": "'
+ base64.b64encode(query).decode()
+ '"}',
self.CONST_CAS_COOKIE,
self.CONST_CAS_COOKIE,
)
return SeeOther(cas_url, headers=[cookie])
@wraps(verifier.verify)
def wrapper(environ, start_response):
data = get_post(environ)
kwargs = dict(urlparse.parse_qsl(data))
kwargs["state"] = json.loads(urllib.unquote(kwargs["state"]))
val, completed = verifier.verify(**kwargs)
if not completed:
return val(environ, start_response)
if val:
set_cookie, cookie_value = verifier.create_cookie(val, "auth")
cookie_value += "; path=/"
url = "{base_url}?{query_string}".format(
base_url="/authorization",
query_string=kwargs["state"]["query"])
response = SeeOther(url, headers=[(set_cookie, cookie_value)])
return response(environ, start_response)
else: # Unsuccessful authentication
url = "{base_url}?{query_string}".format(
base_url="/authorization",
query_string=kwargs["state"]["query"])
response = SeeOther(url)
return response(environ, start_response)
except KeyError:
pass
uid = self.handle_callback(
_dict[self.CONST_TICKET], self.get_service_url(nonce, acr)
)
if uid is None or uid == "":
logger.info("Someone tried to login, but was denied by CAS!")
return Unauthorized("You are not authorized!")
cookie = self.create_cookie(uid, "casm")
return_to = self.generate_return_url(self.return_to, uid)
if "?" in return_to:
return_to += "&"
else:
return_to += "?"
return_to += base64.b64decode(data[self.CONST_QUERY])
return SeeOther(return_to, headers=[cookie])
except Exception:
# FIXME: This should catch specific exception thrown from methods in the block
logger.fatal(
"Metod verify in user_cas.py had a fatal exception.", exc_info=True
)
return Unauthorized("You are not authorized!")
if not completed:
return val(environ, start_response)
if val:
set_cookie, cookie_value = verifier.create_cookie(val, "auth")
cookie_value += "; path=/"
url = "{base_url}?{query_string}".format(
base_url="/authorization",
query_string=kwargs["state"]["query"])
response = SeeOther(url, headers=[(set_cookie, cookie_value)])
return response(environ, start_response)
else: # Unsuccessful authentication
url = "{base_url}?{query_string}".format(
base_url="/authorization",
query_string=kwargs["state"]["query"])
response = SeeOther(url)
return response(environ, start_response)
query = parse_qs(environ["QUERY_STRING"])
if path == "logout":
try:
logoutUrl = session['client'].end_session_endpoint
plru = "{}post_logout".format(SERVER_ENV["base_url"])
logoutUrl += "?" + urlencode({"post_logout_redirect_uri": plru})
try:
logoutUrl += "&" + urlencode({
"id_token_hint": id_token_as_signed_jwt(
session['client'], "HS256")})
except AttributeError as err:
pass
session.clear()
resp = SeeOther(str(logoutUrl))
return resp(environ, start_response)
except Exception as err:
LOGGER.exception("Failed to handle logout")
if path == "post_logout":
return post_logout(environ, start_response)
if session['callback']:
_uri = "%s%s" % (conf.BASE, path)
for _cli in SERVER_ENV["OIC_CLIENT"].values():
if _uri in _cli.redirect_uris:
session['callback'] = False
func = getattr(RP, "callback")
return func(environ, SERVER_ENV, start_response, query, session)
if path == "rpAcr":