Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if '_saml_idp' in cookie:
common_domain_cookie = cookie['_saml_idp']
msg = "Found existing common domain cookie {}".format(common_domain_cookie)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
space_separated_b64_idp_string = unquote(common_domain_cookie.value)
b64_idp_list = space_separated_b64_idp_string.split()
idp_list = [urlsafe_b64decode(b64_idp).decode('utf-8') for b64_idp in b64_idp_list]
else:
msg = "No existing common domain cookie found"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
idp_list = []
msg = "Common domain cookie list of IdPs is {}".format(idp_list)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
# Identity the current IdP just used for authentication in this flow.
this_flow_idp = internal_response.auth_info.issuer
# Remove all occurrences of the current IdP from the list of IdPs.
idp_list = [idp for idp in idp_list if idp != this_flow_idp]
# Append the current IdP.
idp_list.append(this_flow_idp)
msg = "Added IdP {} to common domain cookie list of IdPs".format(this_flow_idp)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
msg = "Common domain cookie list of IdPs is now {}".format(idp_list)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
:type context: satosa.context.Context
:param context: Session context
"""
try:
state = cookie_to_state(
context.cookie,
self.config["COOKIE_STATE_NAME"],
self.config["STATE_ENCRYPTION_KEY"],
)
except SATOSAStateError as e:
state = State()
finally:
context.state = state
msg = "Loaded state {state} from cookie {cookie}".format(state=state, cookie=context.cookie)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
:param sp_entity_id: The requesting sp entity id
:param state: The current state
:return: A list containing approved attributes
"""
name_format = idp_policy.get_name_form(sp_entity_id)
attrconvs = idp.config.attribute_converters
idp_policy.acs = attrconvs
attribute_filter = []
for aconv in attrconvs:
if aconv.name_format == name_format:
all_attributes = {v: None for v in aconv._fro.values()}
attribute_filter = list(idp_policy.restrict(all_attributes, sp_entity_id, idp.metadata).keys())
break
attribute_filter = self.converter.to_internal_filter(self.attribute_profile, attribute_filter)
msg = "Filter: {}".format(attribute_filter)
logline = lu.LOG_FMT.format(id=lu.get_session_id(state), message=msg)
logger.debug(logline)
return attribute_filter
def _metadata_endpoint(self, context):
"""
Endpoint for retrieving the backend metadata
:type context: satosa.context.Context
:rtype: satosa.response.Response
:param context: The current context
:return: response with metadata
"""
msg = "Sending metadata response"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
metadata_string = create_metadata_string(None, self.sp.config, 4, None, None, None, None,
None).decode("utf-8")
return Response(metadata_string, content="text/xml")
def _verify_nonce(self, nonce, context):
"""
Verify the received OIDC 'nonce' from the ID Token.
:param nonce: OIDC nonce
:type nonce: str
:param context: current request context
:type context: satosa.context.Context
:raise SATOSAAuthenticationError: if the nonce is incorrect
"""
backend_state = context.state[self.name]
if nonce != backend_state[NONCE_KEY]:
msg = "Missing or invalid nonce in authn response for state: {}".format(backend_state)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
raise SATOSAAuthenticationError(context.state, "Missing or invalid nonce in authn response")
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.warn(logline)
if on_error:
# Redirect to the configured error handling service with
# the entityIDs for the target SP and IdP used by the user
# as query string parameters (URL encoded).
encodedSpEntityID = urllib.parse.quote_plus(spEntityID)
encodedIdpEntityID = urllib.parse.quote_plus(data.auth_info.issuer)
url = "{}?sp={}&idp={}".format(on_error, encodedSpEntityID, encodedIdpEntityID)
msg = "{} Redirecting to {}".format(logprefix, url)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
return Redirect(url)
msg = "{} Found primary identifier: {}".format(logprefix, primary_identifier_val)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
# Clear input attributes if so configured.
if clear_input_attributes:
msg = "{} Clearing values for these input attributes: {}".format(
logprefix, data.attribute_names
)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
data.attributes = {}
# Set the primary identifier attribute to the value found.
data.attributes[primary_identifier] = primary_identifier_val
msg = "{} Setting attribute {} to value {}".format(
logprefix, primary_identifier, primary_identifier_val
)
:return: response
"""
req_info = idp.parse_authn_request(context.request["SAMLRequest"], binding_in)
authn_req = req_info.message
msg = "{}".format(authn_req)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
# keep the ForceAuthn value to be used by plugins
context.decorate(Context.KEY_FORCE_AUTHN, authn_req.force_authn)
try:
resp_args = idp.response_args(authn_req)
except SAMLError as e:
msg = "Could not find necessary info about entity: {}".format(e)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
return ServiceError("Incorrect request from requester: %s" % e)
requester = resp_args["sp_entity_id"]
context.state[self.name] = self._create_state_data(context, idp.response_args(authn_req),
context.request.get("RelayState"))
subject = authn_req.subject
name_id_value = subject.name_id.text if subject else None
nameid_formats = {
"from_policy": authn_req.name_id_policy and authn_req.name_id_policy.format,
"from_response": subject and subject.name_id and subject.name_id.format,
"from_metadata": (
idp.metadata[requester]
.get("spsso_descriptor", [{}])[0]