Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_link_micro_services(self, satosa_config, micro_services):
base = SATOSABase(satosa_config)
finish_callable = Mock()
base._link_micro_services(micro_services, finish_callable)
for i in range(len(micro_services) - 1):
assert micro_services[i].next == micro_services[i + 1].process
assert micro_services[-1].next == finish_callable
def test_link_micro_services_with_invalid_input(self, satosa_config, micro_services):
base = SATOSABase(satosa_config)
finish_callable = Mock()
# should not raise exception
base._link_micro_services(micro_services, finish_callable)
def test_auth_resp_callback_func_respects_user_id_to_attr(self, context, satosa_config):
satosa_config["INTERNAL_ATTRIBUTES"]["user_id_to_attr"] = "user_id"
base = SATOSABase(satosa_config)
internal_resp = InternalData(auth_info=AuthenticationInformation("", "", ""))
internal_resp.subject_id = "user1234"
context.state[satosa.base.STATE_KEY] = {"requester": "test_requester"}
context.state[satosa.routing.STATE_KEY] = satosa_config["FRONTEND_MODULES"][0]["name"]
base._auth_resp_callback_func(context, internal_resp)
assert internal_resp.attributes["user_id"] == [internal_resp.subject_id]
def test_full_initialisation(self, satosa_config):
base = SATOSABase(satosa_config)
assert base.config == satosa_config
assert len(base.module_router.frontends) == 1
assert len(base.module_router.backends) == 1
assert len(base.request_micro_services) == 1
assert len(base.response_micro_services) == 1
def __init__(self, config, debug=False):
self.debug = debug
if config is None:
raise ValueError("Missing configuration")
self.satosa = SATOSABase(config)
def test_auth_resp_callback_func_user_id_from_attrs_is_used_to_override_user_id(self, context, satosa_config):
satosa_config["INTERNAL_ATTRIBUTES"]["user_id_from_attrs"] = ["user_id", "domain"]
base = SATOSABase(satosa_config)
internal_resp = InternalData(auth_info=AuthenticationInformation("", "", ""))
internal_resp.attributes = {"user_id": ["user"], "domain": ["@example.com"]}
internal_resp.requester = "test_requester"
context.state[satosa.base.STATE_KEY] = {"requester": "test_requester"}
context.state[satosa.routing.STATE_KEY] = satosa_config["FRONTEND_MODULES"][0]["name"]
base._auth_resp_callback_func(context, internal_resp)
expected_user_id = "user@example.com"
assert internal_resp.subject_id == expected_user_id
def __init__(self, config, debug=False):
self.debug = debug
if config is None:
raise ValueError("Missing configuration")
self.satosa = SATOSABase(config)
from satosa.base import SATOSABase
from satosa.context import Context
from satosa.routing import SATOSANoBoundEndpointError
from satosa.service import unpack_either
LOGGER = logging.getLogger("")
LOGFILE_NAME = 's2s.log'
hdlr = logging.FileHandler(LOGFILE_NAME)
base_formatter = logging.Formatter("[%(asctime)-19.19s] [%(levelname)-5.5s]: %(message)s")
hdlr.setFormatter(base_formatter)
LOGGER.addHandler(hdlr)
LOGGER.setLevel(logging.DEBUG)
class WsgiApplication(SATOSABase):
def __init__(self, config, debug=False):
super(WsgiApplication, self).__init__(config)
self.debug = debug
def run_server(self, environ, start_response):
path = environ.get('PATH_INFO', '').lstrip('/')
if ".." in path:
resp = Unauthorized()
return resp(environ, start_response)
context = Context()
context.path = path
context.request = unpack_either(environ)
context.cookie = environ.get("HTTP_COOKIE", "")
try: