Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_full_flow(self, account_linking_config, internal_response, context):
ticket = "ticket"
with responses.RequestsMock() as rsps:
rsps.add(
responses.GET,
"%s/get_id" % account_linking_config["api_url"],
status=404,
body=ticket,
content_type="text/html"
)
result = self.account_linking.process(context, internal_response)
assert isinstance(result, Redirect)
assert result.message.startswith(account_linking_config["redirect_url"])
data = {
"idp": internal_response.auth_info.issuer,
"id": internal_response.subject_id,
"redirect_endpoint": self.account_linking.base_url + "/account_linking/handle_account_linking"
}
key = RSAKey(key=rsa_load(account_linking_config["sign_key"]), use="sig", alg="RS256")
jws = JWS(json.dumps(data), alg=key.alg).sign_compact([key])
uuid = "uuid"
with responses.RequestsMock() as rsps:
# account is linked, 200 OK
rsps.add(
responses.GET,
"%s/get_id?jwt=%s" % (account_linking_config["api_url"], jws),
status=200,
def assert_redirect(self, redirect_resp, expected_ticket):
assert isinstance(redirect_resp, Redirect)
path = urlparse(redirect_resp.message).path
assert path == "/consent/" + expected_ticket
def test_account_linking_failed(self, account_linking_config, internal_response, context):
ticket = "ticket"
responses.add(
responses.GET,
"%s/get_id" % account_linking_config["api_url"],
status=404,
body=ticket,
content_type="text/html"
)
issuer_user_id = internal_response.subject_id
result = self.account_linking.process(context, internal_response)
assert isinstance(result, Redirect)
assert result.message.startswith(account_linking_config["redirect_url"])
# account linking endpoint still does not return an id
internal_response = self.account_linking._handle_al_response(context)
#Verify that we kept the subject_id the issuer sent us
assert internal_response.subject_id == issuer_user_id
"""
oauth_state = get_state(self.config["base_url"], rndstr().encode())
context.state[self.name] = dict(state=oauth_state)
request_args = dict(
response_type='code',
client_id=self.config['client_config']['client_id'],
redirect_uri=self.redirect_url,
state=oauth_state)
scope = ' '.join(self.config['scope'])
if scope:
request_args['scope'] = scope
cis = self.consumer.construct_AuthorizationRequest(
request_args=request_args)
return Redirect(cis.request(self.consumer.authorization_endpoint))
if not primary_identifier_val:
msg = "{} No primary identifier found".format(logprefix)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.warn(logline)
if on_error:
# Redirect to the configured error handling service with
# the entityIDs for the target SP and IdP used by the user
# as query string parameters (URL encoded).
encodedSpEntityID = urllib.parse.quote_plus(spEntityID)
encodedIdpEntityID = urllib.parse.quote_plus(data.auth_info.issuer)
url = "{}?sp={}&idp={}".format(on_error, encodedSpEntityID, encodedIdpEntityID)
msg = "{} Redirecting to {}".format(logprefix, url)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
return Redirect(url)
msg = "{} Found primary identifier: {}".format(logprefix, primary_identifier_val)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
# Clear input attributes if so configured.
if clear_input_attributes:
msg = "{} Clearing values for these input attributes: {}".format(
logprefix, data.attribute_names
)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
data.attributes = {}
# Set the primary identifier attribute to the value found.
data.attributes[primary_identifier] = primary_identifier_val
if self.locked_attr:
consent_args["locked_attrs"] = [self.locked_attr]
if 'requester_logo' in context.state[STATE_KEY]:
consent_args["requester_logo"] = context.state[STATE_KEY]['requester_logo']
try:
ticket = self._consent_registration(consent_args)
except (ConnectionError, UnexpectedResponseError) as e:
msg = "Consent request failed, no consent given: {}".format(str(e))
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
# Send an internal_response without any attributes
internal_response.attributes = {}
return self._end_consent(context, internal_response)
consent_redirect = "%s/%s" % (self.redirect_url, ticket)
return Redirect(consent_redirect)
def start_auth(self, context, internal_request, get_state=stateID):
"""
See super class method satosa.backends.base#start_auth
:param get_state: Generates a state to be used in the authentication call.
:type get_state: Callable[[str, bytes], str]
:type context: satosa.context.Context
:type internal_request: satosa.internal.InternalData
:rtype satosa.response.Redirect
"""
request_args = self.get_request_args(get_state=get_state)
context.state[self.name] = {"state": request_args["state"]}
cis = self.consumer.construct_AuthorizationRequest(request_args=request_args)
return Redirect(cis.request(self.consumer.authorization_endpoint))
"""
oauth_state = get_state(self.config["base_url"], rndstr().encode())
context.state[self.name] = dict(state=oauth_state)
request_args = dict(
client_id=self.config['client_config']['client_id'],
redirect_uri=self.redirect_url,
state=oauth_state,
allow_signup=self.config.get('allow_signup', False))
scope = ' '.join(self.config['scope'])
if scope:
request_args['scope'] = scope
cis = self.consumer.construct_AuthorizationRequest(
request_args=request_args)
return Redirect(cis.request(self.consumer.authorization_endpoint))
on_ldap_search_result_empty = config["on_ldap_search_result_empty"]
if on_ldap_search_result_empty:
# Redirect to the configured URL with
# the entityIDs for the target SP and IdP used by the user
# as query string parameters (URL encoded).
encoded_sp_entity_id = urllib.parse.quote_plus(requester)
encoded_idp_entity_id = urllib.parse.quote_plus(issuer)
url = "{}?sp={}&idp={}".format(
on_ldap_search_result_empty,
encoded_sp_entity_id,
encoded_idp_entity_id,
)
msg = "Redirecting to {}".format(url)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
return Redirect(url)
msg = "Returning data.attributes {}".format(data.attributes)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
return super().process(context, data)