Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
See super class satosa.frontends.base.FrontendModule
:type exception: satosa.exception.SATOSAError
:rtype: oic.utils.http_util.Response
"""
auth_req = self._get_authn_request_from_state(exception.state)
# If the client sent us a state parameter, we should reflect it back according to the spec
if 'state' in auth_req:
error_resp = AuthorizationErrorResponse(error="access_denied",
error_description=exception.message,
state=auth_req['state'])
else:
error_resp = AuthorizationErrorResponse(error="access_denied",
error_description=exception.message)
msg = exception.message
logline = lu.LOG_FMT.format(id=lu.get_session_id(exception.state), message=msg)
logger.debug(logline)
return SeeOther(error_resp.request(auth_req["redirect_uri"], should_fragment_encode(auth_req)))
def _handle_authn_request(self, context):
"""
Parse and verify the authentication request into an internal request.
:type context: satosa.context.Context
:rtype: satosa.internal.InternalData
:param context: the current context
:return: the internal request
"""
request = urlencode(context.request)
msg = "Authn req from client: {}".format(request)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
try:
authn_req = self.provider.parse_authentication_request(request)
except InvalidAuthenticationRequest as e:
msg = "Error in authn req: {}".format(str(e))
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
error_url = e.to_error_url()
if error_url:
return SeeOther(error_url)
else:
return BadRequest("Something went wrong: {}".format(str(e)))
client_id = authn_req["client_id"]
try:
binding, destination = self.sp.pick_binding(
"single_sign_on_service", None, "idpsso", entity_id=entity_id
)
msg = "binding: {}, destination: {}".format(binding, destination)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
acs_endp, response_binding = self.sp.config.getattr("endpoints", "sp")["assertion_consumer_service"][0]
req_id, req = self.sp.create_authn_request(
destination, binding=response_binding, **kwargs
)
relay_state = util.rndstr()
ht_args = self.sp.apply_binding(binding, "%s" % req, destination, relay_state=relay_state)
msg = "ht_args: {}".format(ht_args)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
except Exception as exc:
msg = "Failed to construct the AuthnRequest for state"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline, exc_info=True)
raise SATOSAAuthenticationError(context.state, "Failed to construct the AuthnRequest") from exc
if self.sp.config.getattr('allow_unsolicited', 'sp') is False:
if req_id in self.outstanding_queries:
msg = "Request with duplicate id {}".format(req_id)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
raise SATOSAAuthenticationError(context.state, msg)
self.outstanding_queries[req_id] = req
context.state[self.name] = {"relay_state": relay_state}
:type spec: ((satosa.context.Context, Any) -> satosa.response.Response, Any) |
(satosa.context.Context) -> satosa.response.Response
:param context: The request context
:param spec: bound endpoint function
:return: response
"""
try:
return spec(context)
except SATOSAAuthenticationError as error:
error.error_id = uuid.uuid4().urn
state = json.dumps(error.state.state_dict, indent=4)
msg = "ERROR_ID [{err_id}]\nSTATE:\n{state}".format(
err_id=error.error_id, state=state
)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline, error.state, exc_info=True)
return self._handle_satosa_authentication_error(error)
def process(self, context, data):
"""
Default interface for microservices. Process the input data for
the input context.
"""
issuer = data.auth_info.issuer
requester = data.requester
config = self.config.get(requester) or self.config["default"]
msg = {
"message": "entityID for the involved entities",
"requester": requester,
"issuer": issuer,
"config": self._filter_config(config),
}
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
# Ignore this SP entirely if so configured.
if config["ignore"]:
msg = "Ignoring SP {}".format(requester)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
return super().process(context, data)
# The list of values for the LDAP search filters that will be tried in
# order to find the LDAP directory record for the user.
filter_values = [
filter_value
for candidate in config["ordered_identifier_candidates"]
# Consider and find asserted values to construct the ordered list
# of values for the LDAP search filters.
hash_id = self._get_consent_id(internal_response.requester, internal_response.subject_id,
internal_response.attributes)
try:
consent_attributes = self._verify_consent(hash_id)
except ConnectionError as e:
msg = "Consent service is not reachable, no consent given."
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
# Send an internal_response without any attributes
consent_attributes = None
if consent_attributes is None:
msg = "Consent was NOT given"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
# If consent was not given, then don't send any attributes
consent_attributes = []
else:
msg = "Consent was given"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
internal_response.attributes = self._filter_attributes(internal_response.attributes, consent_attributes)
return self._end_consent(context, internal_response)