Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_filter_one_attribute_for_one_target_provider(self):
target_provider = "test_provider"
attribute_filters = {
target_provider: {
"":
{"a1": "foo:bar"}
}
}
filter_service = self.create_filter_service(attribute_filters)
resp = InternalData(auth_info=AuthenticationInformation(issuer=target_provider))
resp.attributes = {
"a1": ["abc:xyz", "1:foo:bar:2"],
}
filtered = filter_service.process(None, resp)
assert filtered.attributes == {"a1": ["1:foo:bar:2"]}
def test_authz_allow_fail(self):
attribute_allow = {
"": { "default": {"a0": ['foo1','foo2']} }
}
attribute_deny = {}
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
resp = InternalData(auth_info=AuthenticationInformation())
resp.attributes = {
"a0": ["bar"],
}
try:
ctx = Context()
ctx.state = dict()
authz_service.process(ctx, resp)
assert False
except SATOSAAuthenticationError as ex:
assert True
def test_attributes_general(self, ldap_attribute_store):
ldap_to_internal_map = (self.ldap_attribute_store_config['default']
['ldap_to_internal_map'])
for dn, attributes in self.ldap_person_records:
# Mock up the internal response the LDAP attribute store is
# expecting to receive.
response = InternalData(auth_info=AuthenticationInformation())
# The LDAP attribute store configuration and the mock records
# expect to use a LDAP search filter for the uid attribute.
uid = attributes['uid']
response.attributes = {'uid': uid}
context = Context()
context.state = dict()
ldap_attribute_store.process(context, response)
# Verify that the LDAP attribute store has retrieved the mock
# records from the mock LDAP server and has added the appropriate
# internal attributes.
for ldap_attr, ldap_value in attributes.items():
if ldap_attr in ldap_to_internal_map:
def test_authz_deny_success(self):
attribute_deny = {
"": { "default": {"a0": ['foo1','foo2']} }
}
attribute_allow = {}
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
resp = InternalData(auth_info=AuthenticationInformation())
resp.attributes = {
"a0": ["foo2"],
}
try:
ctx = Context()
ctx.state = dict()
authz_service.process(ctx, resp)
assert False
except SATOSAAuthenticationError as ex:
assert True
response = r.json()
if self.config.get('verify_accesstoken_state', True):
self._verify_state(response, state_data, context.state)
auth_info = self.auth_info(context.request)
user_email_response = self.user_information(response["access_token"], 'email_info')
user_info = self.user_information(response["access_token"], 'user_info')
user_email = {
"emailAddress": [
element['handle~']['emailAddress']
for element in user_email_response['elements']
]
}
user_info.update(user_email)
internal_response = InternalData(auth_info=auth_info)
internal_response.attributes = self.converter.to_internal(
self.external_type, user_info)
internal_response.subject_id = user_info[self.user_id_attr]
del context.state[self.name]
return self.auth_callback_func(context, internal_response)
if error_url:
return SeeOther(error_url)
else:
return BadRequest("Something went wrong: {}".format(str(e)))
client_id = authn_req["client_id"]
context.state[self.name] = {"oidc_request": request}
subject_type = self.provider.clients[client_id].get("subject_type", "pairwise")
client_name = self.provider.clients[client_id].get("client_name")
if client_name:
# TODO should process client names for all languages, see OIDC Registration, Section 2.1
requester_name = [{"lang": "en", "text": client_name}]
else:
requester_name = None
internal_req = InternalData(
subject_type=subject_type,
requester=client_id,
requester_name=requester_name,
)
internal_req.attributes = self.converter.to_internal_filter(
"openid", self._get_approved_attributes(self.provider.configuration_information["claims_supported"],
authn_req))
return internal_req
def _translate_response(self, response, issuer):
"""
Translates oidc response to SATOSA internal response.
:type response: dict[str, str]
:type issuer: str
:type subject_type: str
:rtype: InternalData
:param response: Dictioary with attribute name as key.
:param issuer: The oidc op that gave the repsonse.
:param subject_type: public or pairwise according to oidc standard.
:return: A SATOSA internal response.
"""
auth_info = AuthenticationInformation(UNSPECIFIED, str(datetime.now()), issuer)
internal_resp = InternalData(auth_info=auth_info)
internal_resp.attributes = self.converter.to_internal("openid", response)
internal_resp.subject_id = response["sub"]
return internal_resp
def _handle_consent_response(self, context):
"""
Endpoint for handling consent service response
:type context: satosa.context.Context
:rtype: satosa.response.Response
:param context: response context
:return: response
"""
consent_state = context.state[STATE_KEY]
saved_resp = consent_state["internal_resp"]
internal_response = InternalData.from_dict(saved_resp)
hash_id = self._get_consent_id(internal_response.requester, internal_response.subject_id,
internal_response.attributes)
try:
consent_attributes = self._verify_consent(hash_id)
except ConnectionError as e:
msg = "Consent service is not reachable, no consent given."
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
# Send an internal_response without any attributes
consent_attributes = None
if consent_attributes is None:
msg = "Consent was NOT given"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
from enum import Enum
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.saml import NAMEID_FORMAT_EMAILADDRESS
from saml2.saml import NAMEID_FORMAT_UNSPECIFIED
from satosa.internal import AuthenticationInformation as _AuthenticationInformation
from satosa.internal import InternalData as _InternalData
from satosa import util
_warnings.simplefilter("default")
class InternalRequest(_InternalData):
def __init__(self, user_id_hash_type, requester, requester_name=None):
msg = (
"InternalRequest is deprecated."
" Use satosa.internal.InternalData class instead."
)
_warnings.warn(msg, DeprecationWarning)
super().__init__(
user_id_hash_type=user_id_hash_type,
requester=requester,
requester_name=requester_name,
)
@classmethod
def from_dict(cls, data):
instance = cls(
user_id_hash_type=data.get("hash_type"),