Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_test(self, satosa_config_dict, sp_conf, oidc_backend_config, frontend_config):
subject_id = "testuser1"
# proxy config
satosa_config_dict["FRONTEND_MODULES"] = [frontend_config]
satosa_config_dict["BACKEND_MODULES"] = [oidc_backend_config]
satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"openid": [attr_name],
"saml": [attr_name]}
for attr_name in USERS[subject_id]}
frontend_metadata, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# config test SP
frontend_metadata_str = str(frontend_metadata[frontend_config["name"]][0])
sp_conf["metadata"]["inline"].append(frontend_metadata_str)
fakesp = FakeSP(SPConfig().load(sp_conf, metadata_construction=False))
# create auth req
destination, req_args = fakesp.make_auth_req(frontend_metadata[frontend_config["name"]][0].entity_id)
auth_req = urlparse(destination).path + "?" + urlencode(req_args)
# make auth req to proxy
proxied_auth_req = test_client.get(auth_req)
assert proxied_auth_req.status == "302 Found"
parsed_auth_req = dict(parse_qsl(urlparse(proxied_auth_req.data.decode("utf-8")).query))
# create auth resp
def run_test(self, satosa_config_dict, sp_conf, idp_conf, saml_backend_config, frontend_config):
subject_id = "testuser1"
# proxy config
satosa_config_dict["FRONTEND_MODULES"] = [frontend_config]
satosa_config_dict["BACKEND_MODULES"] = [saml_backend_config]
satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"saml": [attr_name]} for attr_name in
USERS[subject_id]}
frontend_metadata, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# config test SP
frontend_metadata_str = str(frontend_metadata[frontend_config["name"]][0])
sp_conf["metadata"]["inline"].append(frontend_metadata_str)
fakesp = FakeSP(SPConfig().load(sp_conf, metadata_construction=False))
# create auth req
destination, req_args = fakesp.make_auth_req(frontend_metadata[frontend_config["name"]][0].entity_id)
auth_req = urlparse(destination).path + "?" + urlencode(req_args)
# make auth req to proxy
proxied_auth_req = test_client.get(auth_req)
assert proxied_auth_req.status == "303 See Other"
# config test IdP
backend_metadata_str = str(backend_metadata[saml_backend_config["name"]][0])
def test_full_flow(self, satosa_config_dict, oidc_frontend_config, saml_backend_config, idp_conf):
subject_id = "testuser1"
# proxy config
satosa_config_dict["FRONTEND_MODULES"] = [oidc_frontend_config]
satosa_config_dict["BACKEND_MODULES"] = [saml_backend_config]
satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"openid": [attr_name],
"saml": [attr_name]}
for attr_name in USERS[subject_id]}
_, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# get frontend OP config info
provider_config = json.loads(test_client.get("/.well-known/openid-configuration").data.decode("utf-8"))
# create auth req
claims_request = ClaimsRequest(id_token=Claims(**{k: None for k in USERS[subject_id]}))
req_args = {"scope": "openid", "response_type": "id_token", "client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI, "nonce": "nonce",
"claims": claims_request.to_json()}
auth_req = urlparse(provider_config["authorization_endpoint"]).path + "?" + urlencode(req_args)
# make auth req to proxy
proxied_auth_req = test_client.get(auth_req)
assert proxied_auth_req.status == "303 See Other"
# config test IdP
def test_full_flow(self, satosa_config_dict, consent_module_config):
api_url = "https://consent.example.com/api"
redirect_url = "https://consent.example.com/redirect"
consent_module_config["config"]["api_url"] = api_url
consent_module_config["config"]["redirect_url"] = redirect_url
satosa_config_dict["MICRO_SERVICES"].append(consent_module_config)
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# incoming auth req
http_resp = test_client.get("/{}/{}/request".format(satosa_config_dict["BACKEND_MODULES"][0]["name"],
satosa_config_dict["FRONTEND_MODULES"][0]["name"]))
assert http_resp.status_code == 200
verify_url_re = re.compile(r"{}/verify/\w+".format(api_url))
with responses.RequestsMock() as rsps:
# fake no previous consent
consent_request_url_re = re.compile(r"{}/creq/\w+".format(api_url))
rsps.add(responses.GET, verify_url_re, status=401)
rsps.add(responses.GET, consent_request_url_re, "test_ticket", status=200)
# incoming auth resp
http_resp = test_client.get("/{}/response".format(satosa_config_dict["BACKEND_MODULES"][0]["name"]))
def test_full_flow(self, satosa_config_dict, account_linking_module_config):
api_url = "https://alservice.example.com/api"
redirect_url = "https://alservice.examle.com/redirect"
account_linking_module_config["config"]["api_url"] = api_url
account_linking_module_config["config"]["redirect_url"] = redirect_url
satosa_config_dict["MICRO_SERVICES"].insert(0, account_linking_module_config)
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# incoming auth req
http_resp = test_client.get("/{}/{}/request".format(satosa_config_dict["BACKEND_MODULES"][0]["name"],
satosa_config_dict["FRONTEND_MODULES"][0]["name"]))
assert http_resp.status_code == 200
with responses.RequestsMock() as rsps:
# fake no previous account linking
rsps.add(responses.GET, "{}/get_id".format(api_url), "test_ticket", status=404)
# incoming auth resp
http_resp = test_client.get("/{}/response".format(satosa_config_dict["BACKEND_MODULES"][0]["name"]))
assert http_resp.status_code == 302
assert http_resp.headers["Location"].startswith(redirect_url)
with responses.RequestsMock() as rsps:
def test_unknown_request_path(self, satosa_config_dict):
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
resp = test_client.get('/unknown')
assert resp.status == NotFound._status
def test_flow(self, satosa_config_dict):
"""
Performs the test.
"""
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# Make request to frontend
resp = test_client.get('/{}/{}/request'.format("backend", "frontend"))
assert resp.status == '200 OK'
headers = dict(resp.headers)
assert headers["Set-Cookie"]
# Fake response coming in to backend
resp = test_client.get('/{}/response'.format("backend"), headers=[("Cookie", headers["Set-Cookie"])])
assert resp.status == '200 OK'
assert resp.data.decode('utf-8') == "Auth response received, passed to test frontend"
import argparse
import os
import sys
from werkzeug.serving import run_simple
from satosa.proxy_server import make_app
from satosa.satosa_config import SATOSAConfig
config_file = os.environ.get("SATOSA_CONFIG", "proxy_conf.yaml")
satosa_config = SATOSAConfig(config_file)
app = make_app(satosa_config)
def main():
global app
parser = argparse.ArgumentParser(description="Process some integers.")
parser.add_argument("port", type=int)
parser.add_argument("--keyfile", type=str)
parser.add_argument("--certfile", type=str)
parser.add_argument("--host", type=str)
args = parser.parse_args()
if (args.keyfile and not args.certfile) or (args.certfile and not args.keyfile):
print("Both keyfile and certfile must be specified for HTTPS.")
sys.exit(1)