Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_start_authentication(self, name, service_provider, identity_providers):
configuration = create_autospec(spec=SAMLConfiguration)
configuration.get_debug = MagicMock(return_value=False)
configuration.get_strict = MagicMock(return_value=False)
configuration.get_service_provider = MagicMock(return_value=service_provider)
configuration.get_identity_providers = MagicMock(return_value=identity_providers)
onelogin_configuration = SAMLOneLoginConfiguration(configuration)
authentication_manager = SAMLAuthenticationManager(onelogin_configuration, SAMLSubjectParser())
with self.app.test_request_context('/'):
result = authentication_manager.start_authentication(self._db, fixtures.IDP_1_ENTITY_ID, '')
query_items = urlparse.parse_qs(urlparse.urlsplit(result).query)
saml_request = query_items['SAMLRequest'][0]
decoded_saml_request = OneLogin_Saml2_Utils.decode_base64_and_inflate(saml_request)
validation_result = OneLogin_Saml2_Utils.validate_xml(
decoded_saml_request,
'saml-schema-protocol-2.0.xsd',
False
)
assert isinstance(validation_result, Document)
saml_request_dom = fromstring(decoded_saml_request)
acs_url = saml_request_dom.get('AssertionConsumerServiceURL')
eq_(acs_url, SERVICE_PROVIDER_WITH_UNSIGNED_REQUESTS.acs_service.url)
acs_binding = saml_request_dom.get('ProtocolBinding')
eq_(acs_binding, SERVICE_PROVIDER_WITH_UNSIGNED_REQUESTS.acs_service.binding.value)
def redirect_to(self, url=None, parameters={}):
"""
Redirects the user to the url past by parameter or to the url that we defined in our SSO Request.
:param url: The target URL to redirect the user
:type url: string
:param parameters: Extra parameters to be passed as part of the url
:type parameters: dict
:returns: Redirection url
"""
if url is None and 'RelayState' in self.__request_data['get_data']:
url = self.__request_data['get_data']['RelayState']
return OneLogin_Saml2_Utils.redirect(url, parameters, request_data=self.__request_data)
"""Parses a name ID format
NOTE: OneLogin's python-saml library used for implementing SAML authentication support only one name ID format.
If there are multiple name ID formats specified in the XML metadata, we select the first one.
:param provider_node: Parent IDPSSODescriptor/SPSSODescriptor node
:type provider_node: defusedxml.lxml.RestrictedElement
:return: Name ID format
:rtype: string
"""
name_id_format = NameIDFormat.UNSPECIFIED.value
name_id_format_nodes = OneLogin_Saml2_Utils.query(provider_node, './ md:NameIDFormat')
if len(name_id_format_nodes) > 0:
# OneLogin's python-saml supports only one name ID format so we select the first one
name_id_format = OneLogin_Saml2_Utils.element_text(name_id_format_nodes[0])
return name_id_format
:param relay_state: The Relay State
:type relay_state: str
:param algorithm: The Signature Algorithm
:type algorithm: str
:param saml_type: The target URL the user should be redirected to
:type saml_type: string SAMLRequest | SAMLResponse
:param lowercase_urlencoding: lowercase or no
:type lowercase_urlencoding: boolean
"""
sign_data = ['%s=%s' % (saml_type, OneLogin_Saml2_Utils.escape_url(saml_data, lowercase_urlencoding))]
if relay_state is not None:
sign_data.append('RelayState=%s' % OneLogin_Saml2_Utils.escape_url(relay_state, lowercase_urlencoding))
sign_data.append('SigAlg=%s' % OneLogin_Saml2_Utils.escape_url(algorithm, lowercase_urlencoding))
return '&'.join(sign_data)
provider = req['get_data']['provider']
except KeyError:
provider = list(providers[0].keys())[0]
req['get_data']['provider'] = provider
for index, provider_obj in enumerate(providers):
if list(provider_obj.keys())[0] == provider:
base_cfg = settings.SAML_PROVIDERS[index][provider]
break
if not base_cfg:
raise SAMLSettingsError("Provider %s was not found in settings" % provider)
final_cfg = base_cfg
try:
final_cfg['sp']['x509cert'] = OneLogin_Saml2_Utils.format_cert(final_cfg['sp']['x509cert'])
final_cfg['sp']['privateKey'] = OneLogin_Saml2_Utils.format_private_key(final_cfg['sp']['privateKey'])
final_cfg['idp']['x509cert'] = OneLogin_Saml2_Utils.format_cert(final_cfg['idp']['x509cert'])
except KeyError:
pass
return final_cfg
def format_sp_key(self):
"""
Formats the private key.
"""
self.__sp['privateKey'] = OneLogin_Saml2_Utils.format_private_key(self.__sp['privateKey'])
(assertion_tag in signed_elements and signed_elements.count(assertion_tag) > 1) or \
(response_tag not in signed_elements and assertion_tag not in signed_elements):
return False
# Check that the signed elements found here, are the ones that will be verified
# by OneLogin_Saml2_Utils.validate_sign
if response_tag in signed_elements:
expected_signature_nodes = OneLogin_Saml2_Utils.query(self.document, OneLogin_Saml2_Utils.RESPONSE_SIGNATURE_XPATH)
if len(expected_signature_nodes) != 1:
raise OneLogin_Saml2_ValidationError(
'Unexpected number of Response signatures found. SAML Response rejected.',
OneLogin_Saml2_ValidationError.WRONG_NUMBER_OF_SIGNATURES_IN_RESPONSE
)
if assertion_tag in signed_elements:
expected_signature_nodes = self.__query(OneLogin_Saml2_Utils.ASSERTION_SIGNATURE_XPATH)
if len(expected_signature_nodes) != 1:
raise OneLogin_Saml2_ValidationError(
'Unexpected number of Assertion signatures found. SAML Response rejected.',
OneLogin_Saml2_ValidationError.WRONG_NUMBER_OF_SIGNATURES_IN_ASSERTION
)
return True
def __init__(self, settings, response=None):
"""
Constructs a Logout Response object (Initialize params from settings
and if provided load the Logout Response.
Arguments are:
* (OneLogin_Saml2_Settings) settings. Setting data
* (string) response. An UUEncoded SAML Logout
response from the IdP.
"""
self.__settings = settings
self.__error = None
if response is not None:
self.__logout_response = OneLogin_Saml2_Utils.decode_base64_and_inflate(response)
self.document = OneLogin_Saml2_XML.to_etree(self.__logout_response)
"""
self.__errors = []
self.__error_reason = None
if 'get_data' in self.__request_data and 'SAMLResponse' in self.__request_data['get_data']:
logout_response = OneLogin_Saml2_Logout_Response(self.__settings, self.__request_data['get_data']['SAMLResponse'])
self.__last_response = logout_response.get_xml()
if not logout_response.is_valid(self.__request_data, request_id):
self.__errors.append('invalid_logout_response')
self.__error_reason = logout_response.get_error()
elif logout_response.get_status() != OneLogin_Saml2_Constants.STATUS_SUCCESS:
self.__errors.append('logout_not_success')
else:
self.__last_message_id = logout_response.id
if not keep_local_session:
OneLogin_Saml2_Utils.delete_local_session(delete_session_cb)
elif 'get_data' in self.__request_data and 'SAMLRequest' in self.__request_data['get_data']:
logout_request = OneLogin_Saml2_Logout_Request(self.__settings, self.__request_data['get_data']['SAMLRequest'])
self.__last_request = logout_request.get_xml()
if not logout_request.is_valid(self.__request_data):
self.__errors.append('invalid_logout_request')
self.__error_reason = logout_request.get_error()
else:
if not keep_local_session:
OneLogin_Saml2_Utils.delete_local_session(delete_session_cb)
in_response_to = logout_request.id
self.__last_message_id = logout_request.id
response_builder = OneLogin_Saml2_Logout_Response(self.__settings)
response_builder.build(in_response_to)
self.__last_response = response_builder.get_xml()