Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run_test(self, satosa_config_dict, sp_conf, oidc_backend_config, frontend_config):
subject_id = "testuser1"
# proxy config
satosa_config_dict["FRONTEND_MODULES"] = [frontend_config]
satosa_config_dict["BACKEND_MODULES"] = [oidc_backend_config]
satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"openid": [attr_name],
"saml": [attr_name]}
for attr_name in USERS[subject_id]}
frontend_metadata, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# config test SP
frontend_metadata_str = str(frontend_metadata[frontend_config["name"]][0])
sp_conf["metadata"]["inline"].append(frontend_metadata_str)
fakesp = FakeSP(SPConfig().load(sp_conf, metadata_construction=False))
# create auth req
destination, req_args = fakesp.make_auth_req(frontend_metadata[frontend_config["name"]][0].entity_id)
auth_req = urlparse(destination).path + "?" + urlencode(req_args)
# make auth req to proxy
proxied_auth_req = test_client.get(auth_req)
assert proxied_auth_req.status == "302 Found"
parsed_auth_req = dict(parse_qsl(urlparse(proxied_auth_req.data.decode("utf-8")).query))
# create auth resp
def run_test(self, satosa_config_dict, sp_conf, oidc_backend_config, frontend_config):
subject_id = "testuser1"
# proxy config
satosa_config_dict["FRONTEND_MODULES"] = [frontend_config]
satosa_config_dict["BACKEND_MODULES"] = [oidc_backend_config]
satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"openid": [attr_name],
"saml": [attr_name]}
for attr_name in USERS[subject_id]}
frontend_metadata, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# config test SP
frontend_metadata_str = str(frontend_metadata[frontend_config["name"]][0])
sp_conf["metadata"]["inline"].append(frontend_metadata_str)
fakesp = FakeSP(SPConfig().load(sp_conf, metadata_construction=False))
# create auth req
destination, req_args = fakesp.make_auth_req(frontend_metadata[frontend_config["name"]][0].entity_id)
auth_req = urlparse(destination).path + "?" + urlencode(req_args)
# make auth req to proxy
proxied_auth_req = test_client.get(auth_req)
assert proxied_auth_req.status == "302 Found"
def run_test(self, satosa_config_dict, sp_conf, idp_conf, saml_backend_config, frontend_config):
subject_id = "testuser1"
# proxy config
satosa_config_dict["FRONTEND_MODULES"] = [frontend_config]
satosa_config_dict["BACKEND_MODULES"] = [saml_backend_config]
satosa_config_dict["INTERNAL_ATTRIBUTES"]["attributes"] = {attr_name: {"saml": [attr_name]} for attr_name in
USERS[subject_id]}
frontend_metadata, backend_metadata = create_entity_descriptors(SATOSAConfig(satosa_config_dict))
# application
test_client = Client(make_app(SATOSAConfig(satosa_config_dict)), BaseResponse)
# config test SP
frontend_metadata_str = str(frontend_metadata[frontend_config["name"]][0])
sp_conf["metadata"]["inline"].append(frontend_metadata_str)
fakesp = FakeSP(SPConfig().load(sp_conf, metadata_construction=False))
# create auth req
destination, req_args = fakesp.make_auth_req(frontend_metadata[frontend_config["name"]][0].entity_id)
auth_req = urlparse(destination).path + "?" + urlencode(req_args)
# make auth req to proxy
proxied_auth_req = test_client.get(auth_req)
assert proxied_auth_req.status == "303 See Other"
# config test IdP
backend_metadata_str = str(backend_metadata[saml_backend_config["name"]][0])
def test_filter_one_attribute_for_one_target_provider(self):
target_provider = "test_provider"
attribute_filters = {
target_provider: {
"":
{"a1": "foo:bar"}
}
}
filter_service = self.create_filter_service(attribute_filters)
resp = InternalData(auth_info=AuthenticationInformation(issuer=target_provider))
resp.attributes = {
"a1": ["abc:xyz", "1:foo:bar:2"],
}
filtered = filter_service.process(None, resp)
assert filtered.attributes == {"a1": ["1:foo:bar:2"]}
def test_authz_allow_second(self):
attribute_allow = {
"": { "default": {"a0": ['foo1','foo2']} }
}
attribute_deny = {}
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
resp = InternalData(auth_info=AuthenticationInformation())
resp.attributes = {
"a0": ["foo2","kaka"],
}
try:
ctx = Context()
ctx.state = dict()
authz_service.process(ctx, resp)
except SATOSAAuthenticationError as ex:
assert False
def test_authz_allow_fail(self):
attribute_allow = {
"": { "default": {"a0": ['foo1','foo2']} }
}
attribute_deny = {}
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
resp = InternalData(auth_info=AuthenticationInformation())
resp.attributes = {
"a0": ["bar"],
}
try:
ctx = Context()
ctx.state = dict()
authz_service.process(ctx, resp)
assert False
except SATOSAAuthenticationError as ex:
assert True
def test_attributes_general(self, ldap_attribute_store):
ldap_to_internal_map = (self.ldap_attribute_store_config['default']
['ldap_to_internal_map'])
for dn, attributes in self.ldap_person_records:
# Mock up the internal response the LDAP attribute store is
# expecting to receive.
response = InternalData(auth_info=AuthenticationInformation())
# The LDAP attribute store configuration and the mock records
# expect to use a LDAP search filter for the uid attribute.
uid = attributes['uid']
response.attributes = {'uid': uid}
context = Context()
context.state = dict()
ldap_attribute_store.process(context, response)
# Verify that the LDAP attribute store has retrieved the mock
# records from the mock LDAP server and has added the appropriate
# internal attributes.
for ldap_attr, ldap_value in attributes.items():
if ldap_attr in ldap_to_internal_map:
def test_authz_deny_success(self):
attribute_deny = {
"": { "default": {"a0": ['foo1','foo2']} }
}
attribute_allow = {}
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
resp = InternalData(auth_info=AuthenticationInformation())
resp.attributes = {
"a0": ["foo2"],
}
try:
ctx = Context()
ctx.state = dict()
authz_service.process(ctx, resp)
assert False
except SATOSAAuthenticationError as ex:
assert True
def test_filter_one_attribute_from_all_target_providers_for_one_requester(self):
requester = "test_requester"
attribute_filters = {
"": {
requester:
{"a1": "foo:bar"}
}
}
filter_service = self.create_filter_service(attribute_filters)
resp = InternalData(auth_info=AuthenticationInformation())
resp.requester = requester
resp.attributes = {
"a1": ["abc:xyz", "1:foo:bar:2"],
}
filtered = filter_service.process(None, resp)
assert filtered.attributes == {"a1": ["1:foo:bar:2"]}
def test_auth_resp_callback_func_user_id_from_attrs_is_used_to_override_user_id(self, context, satosa_config):
satosa_config["INTERNAL_ATTRIBUTES"]["user_id_from_attrs"] = ["user_id", "domain"]
base = SATOSABase(satosa_config)
internal_resp = InternalData(auth_info=AuthenticationInformation("", "", ""))
internal_resp.attributes = {"user_id": ["user"], "domain": ["@example.com"]}
internal_resp.requester = "test_requester"
context.state[satosa.base.STATE_KEY] = {"requester": "test_requester"}
context.state[satosa.routing.STATE_KEY] = satosa_config["FRONTEND_MODULES"][0]["name"]
base._auth_resp_callback_func(context, internal_resp)
expected_user_id = "user@example.com"
assert internal_resp.subject_id == expected_user_id