Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_handle_backend_error(self, context, frontend):
redirect_uri = "https://client.example.com"
areq = AuthorizationRequest(client_id=CLIENT_ID, scope="openid", response_type="id_token",
redirect_uri=redirect_uri)
context.state[frontend.name] = {"oidc_request": areq.to_urlencoded()}
# fake an error
message = "test error"
error = SATOSAAuthenticationError(context.state, message)
resp = frontend.handle_backend_error(error)
assert resp.message.startswith(redirect_uri)
error_response = AuthorizationErrorResponse().deserialize(urlparse(resp.message).fragment)
error_response["error"] = "access_denied"
error_response["error_description"] == message
data_list = ["1", "2", "3"]
service_list = []
fail_service = MicroService()
fail_service.process = create_process_fail_func("4")
service_list.append(fail_service)
for d in data_list:
service = MicroService()
service.process = create_process_func(d)
service_list.append(service)
service_queue = build_micro_service_queue(service_list)
test_data = "test_data"
with pytest.raises(SATOSAAuthenticationError):
service_queue.process_service_queue(context, test_data)
ht_args = self.sp.apply_binding(binding, "%s" % req, destination, relay_state=relay_state)
msg = "ht_args: {}".format(ht_args)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
except Exception as exc:
msg = "Failed to construct the AuthnRequest for state"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline, exc_info=True)
raise SATOSAAuthenticationError(context.state, "Failed to construct the AuthnRequest") from exc
if self.sp.config.getattr('allow_unsolicited', 'sp') is False:
if req_id in self.outstanding_queries:
msg = "Request with duplicate id {}".format(req_id)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
raise SATOSAAuthenticationError(context.state, msg)
self.outstanding_queries[req_id] = req
context.state[self.name] = {"relay_state": relay_state}
return make_saml_response(binding, ht_args)
:type context: satosa.context.Context
:rtype: satosa.response.Response
:param context: The current context
:return: response
"""
info = context.request
state = context.state
try:
entity_id = info["entityID"]
except KeyError as err:
msg = "No IDP chosen for state"
logline = lu.LOG_FMT.format(id=lu.get_session_id(state), message=msg)
logger.debug(logline, exc_info=True)
raise SATOSAAuthenticationError(state, "No IDP chosen") from err
return self.authn_request(context, entity_id)
:type resp: AuthorizationResponse
:type state_data: dict[str, str]
:type state: satosa.state.State
:param resp: The authorization response from the AS, created by pyoidc.
:param state_data: The state data for this backend.
:param state: The current state for the proxy and this backend.
Only used for raising errors.
"""
is_known_state = "state" in resp and "state" in state_data and resp["state"] == state_data["state"]
if not is_known_state:
received_state = resp.get("state", "")
msg = "Missing or invalid state [{}] in response!".format(received_state)
logline = lu.LOG_FMT.format(id=lu.get_session_id(state), message=msg)
logger.debug(logline)
raise SATOSAAuthenticationError(state,
"Missing or invalid state [%s] in response!" %
received_state)
logger.debug(logline)
acs_endp, response_binding = self.sp.config.getattr("endpoints", "sp")["assertion_consumer_service"][0]
req_id, req = self.sp.create_authn_request(
destination, binding=response_binding, **kwargs
)
relay_state = util.rndstr()
ht_args = self.sp.apply_binding(binding, "%s" % req, destination, relay_state=relay_state)
msg = "ht_args: {}".format(ht_args)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
except Exception as exc:
msg = "Failed to construct the AuthnRequest for state"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline, exc_info=True)
raise SATOSAAuthenticationError(context.state, "Failed to construct the AuthnRequest") from exc
if self.sp.config.getattr('allow_unsolicited', 'sp') is False:
if req_id in self.outstanding_queries:
msg = "Request with duplicate id {}".format(req_id)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
raise SATOSAAuthenticationError(context.state, msg)
self.outstanding_queries[req_id] = req
context.state[self.name] = {"relay_state": relay_state}
return make_saml_response(binding, ht_args)
if self.sp.config.getattr('allow_unsolicited', 'sp') is False:
req_id = authn_response.in_response_to
if req_id not in self.outstanding_queries:
msg = "No request with id: {}".format(req_id),
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
raise SATOSAAuthenticationError(context.state, msg)
del self.outstanding_queries[req_id]
# check if the relay_state matches the cookie state
if context.state[self.name]["relay_state"] != context.request["RelayState"]:
msg = "State did not match relay state for state"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
raise SATOSAAuthenticationError(context.state, "State did not match relay state")
context.decorate(Context.KEY_METADATA_STORE, self.sp.metadata)
if self.config.get(SAMLBackend.KEY_MEMORIZE_IDP):
issuer = authn_response.response.issuer.text.strip()
context.state[Context.KEY_MEMORIZED_IDP] = issuer
context.state.pop(self.name, None)
context.state.pop(Context.KEY_FORCE_AUTHN, None)
return self.auth_callback_func(context, self._translate_response(authn_response, context.state))
:param context: The current context
:param entity_id: Target IDP entity id
:return: response to the user agent
"""
# If IDP blacklisting is enabled and the selected IDP is blacklisted,
# stop here
if self.idp_blacklist_file:
with open(self.idp_blacklist_file) as blacklist_file:
blacklist_array = json.load(blacklist_file)['blacklist']
if entity_id in blacklist_array:
msg = "IdP with EntityID {} is blacklisted".format(entity_id)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline, exc_info=False)
raise SATOSAAuthenticationError(context.state, "Selected IdP is blacklisted for this backend")
kwargs = {}
authn_context = self.construct_requested_authn_context(entity_id)
if authn_context:
kwargs["requested_authn_context"] = authn_context
if self.config.get(SAMLBackend.KEY_MIRROR_FORCE_AUTHN):
kwargs["force_authn"] = get_force_authn(
context, self.config, self.sp.config
)
try:
binding, destination = self.sp.pick_binding(
"single_sign_on_service", None, "idpsso", entity_id=entity_id
)
msg = "binding: {}, destination: {}".format(binding, destination)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)