Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'name_id' : name_id,
'authn' : auth_info,
'sign_response' : sign_response,
'sign_assertion': sign_assertion,
'encrypt_assertion': encrypt_assertion,
'encrypted_advice_attributes': encrypted_advice_attributes
}
# Add the SP details
args.update(**resp_args)
try:
args['sign_alg'] = getattr(xmldsig, sign_alg)
except AttributeError as e:
msg = "Unsupported sign algorithm {}".format(sign_alg)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
raise Exception(msg) from e
else:
msg = "signing with algorithm {}".format(args['sign_alg'])
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
try:
args['digest_alg'] = getattr(xmldsig, digest_alg)
except AttributeError as e:
msg = "Unsupported digest algorithm {}".format(digest_alg)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
raise Exception(msg) from e
else:
msg = "using digest algorithm {}".format(args['digest_alg'])
processed.
:type context: satosa.context.Context
:type internal_request: satosa.internal.InternalData
:rtype: satosa.response.Response
:param context: The request context
:param internal_request: request processed by the frontend
:return: response
"""
state = context.state
state[STATE_KEY] = {"requester": internal_request.requester}
msg = "Requesting provider: {}".format(internal_request.requester)
logline = lu.LOG_FMT.format(id=lu.get_session_id(state), message=msg)
logger.info(logline)
if self.request_micro_services:
return self.request_micro_services[0].process(context, internal_request)
return self._auth_req_finish(context, internal_request)
except KeyError as err:
msg = "{} Unable to determine the entityID for the SP requester".format(logprefix)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
return super().process(context, data)
msg = "{} entityID for the SP requester is {}".format(logprefix, spEntityID)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
# Find the entityID for the IdP that issued the assertion
try:
idpEntityID = data.auth_info.issuer
except KeyError as err:
msg = "{} Unable to determine the entityID for the IdP issuer".format(logprefix)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
return super().process(context, data)
# Examine our configuration to determine if there is a per-IdP configuration
if idpEntityID in self.config:
config = self.config[idpEntityID]
msg = "{} For IdP {} using configuration {}".format(logprefix, idpEntityID, config)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
# Examine our configuration to determine if there is a per-SP configuration.
# An SP configuration overrides an IdP configuration when there is a conflict.
if spEntityID in self.config:
config = self.config[spEntityID]
msg = "{} For SP {} using configuration {}".format(logprefix, spEntityID, config)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
)
relay_state = util.rndstr()
ht_args = self.sp.apply_binding(binding, "%s" % req, destination, relay_state=relay_state)
msg = "ht_args: {}".format(ht_args)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
except Exception as exc:
msg = "Failed to construct the AuthnRequest for state"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline, exc_info=True)
raise SATOSAAuthenticationError(context.state, "Failed to construct the AuthnRequest") from exc
if self.sp.config.getattr('allow_unsolicited', 'sp') is False:
if req_id in self.outstanding_queries:
msg = "Request with duplicate id {}".format(req_id)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
raise SATOSAAuthenticationError(context.state, msg)
self.outstanding_queries[req_id] = req
context.state[self.name] = {"relay_state": relay_state}
return make_saml_response(binding, ht_args)
):
msg = "{} IdP asserted NameID {}".format(logprefix, name_id_value)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
candidate_nameid_value = name_id_value
# Only add the NameID value asserted by the IdP if it is not already
# in the list of values. This is necessary because some non-compliant IdPs
# have been known, for example, to assert the value of eduPersonPrincipalName
# in the value for SAML2 persistent NameID as well as asserting
# eduPersonPrincipalName.
if candidate_nameid_value not in values:
msg = "{} Added NameID {} to candidate values".format(
logprefix, candidate_nameid_value
)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
values.append(candidate_nameid_value)
else:
msg = "{} NameID {} value also asserted as attribute value".format(
logprefix, candidate_nameid_value
)
logline = logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.warn(logline)
# If no value was asserted by the IdP for one of the configured list of attribute names
# for this candidate then go onto the next candidate.
if None in values:
msg = "{} Candidate is missing value so skipping".format(logprefix)
logline = logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
continue
def ping_endpoint(self, context):
"""
"""
msg = "Ping returning 200 OK"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
msg = " "
return Response(msg)
:param internal_response: the response
:return: response
"""
context.state[STATE_KEY] = context.state.get(STATE_KEY, {})
id_hash = self._get_consent_id(
internal_response.requester,
internal_response.subject_id,
internal_response.attributes,
)
try:
# Check if consent is already given
consent_attributes = self._verify_consent(id_hash)
except requests.exceptions.ConnectionError as e:
msg = "Consent service is not reachable, no consent is given."
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
# Send an internal_response without any attributes
internal_response.attributes = {}
return self._end_consent(context, internal_response)
# Previous consent was given
if consent_attributes is not None:
msg = "Previous consent was given"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
internal_response.attributes = self._filter_attributes(internal_response.attributes, consent_attributes)
return self._end_consent(context, internal_response)
# No previous consent, request consent by user
return self._approve_new_consent(context, internal_response, id_hash)