Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_authz_allow_fail(self):
attribute_allow = {
"": { "default": {"a0": ['foo1','foo2']} }
}
attribute_deny = {}
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
resp = InternalData(auth_info=AuthenticationInformation())
resp.attributes = {
"a0": ["bar"],
}
try:
ctx = Context()
ctx.state = dict()
authz_service.process(ctx, resp)
assert False
except SATOSAAuthenticationError as ex:
assert True
def test_authn_response(self, backend_config, userinfo, incoming_authn_response):
self.setup_token_endpoint(
backend_config["server_info"]["token_endpoint"])
self.setup_userinfo_endpoint(
backend_config["server_info"]["user_info"], userinfo)
self.orcid_backend._authn_response(incoming_authn_response)
args = self.orcid_backend.auth_callback_func.call_args[0]
assert isinstance(args[0], Context)
assert isinstance(args[1], InternalData)
self.assert_expected_attributes(userinfo, args[1].attributes)
def test_full_flow(self, context, idp_conf, sp_conf):
test_state_key = "test_state_key_456afgrh"
response_binding = BINDING_HTTP_REDIRECT
fakeidp = FakeIdP(USERS, config=IdPConfig().load(idp_conf, metadata_construction=False))
context.state[test_state_key] = "my_state"
# start auth flow (redirecting to discovery server)
resp = self.samlbackend.start_auth(context, InternalData())
self.assert_redirect_to_discovery_server(resp, sp_conf)
# fake response from discovery server
disco_resp = parse_qs(urlparse(resp.message).query)
info = parse_qs(urlparse(disco_resp["return"][0]).query)
info["entityID"] = idp_conf["entityid"]
request_context = Context()
request_context.request = info
request_context.state = context.state
# pass discovery response to backend and check that it redirects to the selected IdP
resp = self.samlbackend.disco_response(request_context)
self.assert_redirect_to_idp(resp, idp_conf)
# fake auth response to the auth request
req_params = dict(parse_qsl(urlparse(resp.message).query))
url, fake_idp_resp = fakeidp.handle_auth_req(
req_params["SAMLRequest"],
req_params["RelayState"],
BINDING_HTTP_REDIRECT,
"testuser1",
response_binding=response_binding)
response_context = Context()
def target_context(context):
entityid_bytes = TARGET_ENTITY.encode("utf-8")
entityid_b64_str = urlsafe_b64encode(entityid_bytes).decode("utf-8")
context.decorate(Context.KEY_MIRROR_TARGET_ENTITYID, entityid_b64_str)
return context
def test_generate_mustache2(self):
synthetic_attributes = {
"": { "default": {"a0": "{{kaka.first}}#{{eppn.scope}}" }}
}
authz_service = self.create_syn_service(synthetic_attributes)
resp = InternalData(auth_info=AuthenticationInformation())
resp.attributes = {
"kaka": ["kaka1","kaka2"],
"eppn": ["a@example.com","b@example.com"]
}
ctx = Context()
ctx.state = dict()
authz_service.process(ctx, resp)
assert("kaka1#example.com" in resp.attributes['a0'])
assert("kaka1" in resp.attributes['kaka'])
assert("a@example.com" in resp.attributes['eppn'])
assert("b@example.com" in resp.attributes['eppn'])
def test_response_endpoint(self, backend_config, signing_key, incoming_authn_response):
self.setup_jwks_uri(backend_config["provider_metadata"]["jwks_uri"], signing_key)
self.setup_token_endpoint(backend_config["provider_metadata"]["token_endpoint"], signing_key)
self.setup_userinfo_endpoint(backend_config["provider_metadata"]["userinfo_endpoint"])
self.oidc_backend.response_endpoint(incoming_authn_response)
assert self.oidc_backend.name not in incoming_authn_response.state
args = self.oidc_backend.auth_callback_func.call_args[0]
assert isinstance(args[0], Context)
assert isinstance(args[1], InternalResponse)
self.assert_expected_attributes(args[1].attributes)
def test_authz_allow_second(self):
attribute_allow = {
"": { "default": {"a0": ['foo1','foo2']} }
}
attribute_deny = {}
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
resp = InternalData(auth_info=AuthenticationInformation())
resp.attributes = {
"a0": ["foo2","kaka"],
}
try:
ctx = Context()
ctx.state = dict()
authz_service.process(ctx, resp)
except SATOSAAuthenticationError as ex:
assert False
request_context.request = info
request_context.state = context.state
# pass discovery response to backend and check that it redirects to the selected IdP
resp = self.samlbackend.disco_response(request_context)
self.assert_redirect_to_idp(resp, idp_conf)
# fake auth response to the auth request
req_params = dict(parse_qsl(urlparse(resp.message).query))
url, fake_idp_resp = fakeidp.handle_auth_req(
req_params["SAMLRequest"],
req_params["RelayState"],
BINDING_HTTP_REDIRECT,
"testuser1",
response_binding=response_binding)
response_context = Context()
response_context.request = fake_idp_resp
response_context.state = request_context.state
# pass auth response to backend and verify behavior
self.samlbackend.authn_response(response_context, response_binding)
context, internal_resp = self.samlbackend.auth_callback_func.call_args[0]
assert self.samlbackend.name not in context.state
assert context.state[test_state_key] == "my_state"
self.assert_authn_response(internal_resp)
def test_endpoint_routing_to_frontend(self, url_path, expected_frontend, expected_backend):
context = Context()
context.path = url_path
self.router.endpoint_routing(context)
assert context.target_frontend == expected_frontend
assert context.target_backend == expected_backend
def process(self, context, data):
target_entity_id = context.get_decoration(Context.KEY_TARGET_ENTITYID)
if None is target_entity_id:
msg = "{name} can only be used when a target entityid is set".format(
name=self.__class__.__name__
)
logger.error(msg)
raise SATOSAError(msg)
target_specific_rules = self.rules.get(target_entity_id)
# default to allowing everything if there are no specific rules
if not target_specific_rules:
logger.debug("Requester '{}' allowed by default to target entity '{}' due to no entity specific rules".format(
data.requester, target_entity_id
))
return super().process(context, data)
# deny rules takes precedence