Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_start_authentication(self, name, service_provider, identity_providers):
configuration = create_autospec(spec=SAMLConfiguration)
configuration.get_debug = MagicMock(return_value=False)
configuration.get_strict = MagicMock(return_value=False)
configuration.get_service_provider = MagicMock(return_value=service_provider)
configuration.get_identity_providers = MagicMock(return_value=identity_providers)
onelogin_configuration = SAMLOneLoginConfiguration(configuration)
authentication_manager = SAMLAuthenticationManager(onelogin_configuration, SAMLSubjectParser())
with self.app.test_request_context('/'):
result = authentication_manager.start_authentication(self._db, fixtures.IDP_1_ENTITY_ID, '')
query_items = urlparse.parse_qs(urlparse.urlsplit(result).query)
saml_request = query_items['SAMLRequest'][0]
decoded_saml_request = OneLogin_Saml2_Utils.decode_base64_and_inflate(saml_request)
validation_result = OneLogin_Saml2_Utils.validate_xml(
decoded_saml_request,
'saml-schema-protocol-2.0.xsd',
False
)
assert isinstance(validation_result, Document)
saml_request_dom = fromstring(decoded_saml_request)
acs_url = saml_request_dom.get('AssertionConsumerServiceURL')
eq_(acs_url, SERVICE_PROVIDER_WITH_UNSIGNED_REQUESTS.acs_service.url)
acs_binding = saml_request_dom.get('ProtocolBinding')
eq_(acs_binding, SERVICE_PROVIDER_WITH_UNSIGNED_REQUESTS.acs_service.binding.value)
def test_init_saml_auth(self):
r = RequestFactory()
request = r.get('/sso/saml/?provider=MyProvider', **dict(HTTP_HOST='example.com'))
req = prepare_django_request(request)
auth_obj = init_saml_auth(req)
self.assertTrue(type(auth_obj) is onelogin.saml2.auth.OneLogin_Saml2_Auth)
)
encrypted_assertion_nodes = OneLogin_Saml2_Utils.query(dom, '/samlp:Response/saml:EncryptedAssertion')
if encrypted_assertion_nodes:
encrypted_data_nodes = OneLogin_Saml2_Utils.query(encrypted_assertion_nodes[0], '//saml:EncryptedAssertion/xenc:EncryptedData')
if encrypted_data_nodes:
keyinfo = OneLogin_Saml2_Utils.query(encrypted_assertion_nodes[0], '//saml:EncryptedAssertion/xenc:EncryptedData/ds:KeyInfo')
if not keyinfo:
raise OneLogin_Saml2_ValidationError(
'No KeyInfo present, invalid Assertion',
OneLogin_Saml2_ValidationError.KEYINFO_NOT_FOUND_IN_ENCRYPTED_DATA
)
keyinfo = keyinfo[0]
children = keyinfo.getchildren()
if not children:
raise OneLogin_Saml2_ValidationError(
'KeyInfo has no children nodes, invalid Assertion',
OneLogin_Saml2_ValidationError.CHILDREN_NODE_NOT_FOUND_IN_KEYINFO
)
for child in children:
if 'RetrievalMethod' in child.tag:
if child.attrib['Type'] != 'http://www.w3.org/2001/04/xmlenc#EncryptedKey':
raise OneLogin_Saml2_ValidationError(
'Unsupported Retrieval Method found',
OneLogin_Saml2_ValidationError.UNSUPPORTED_RETRIEVAL_METHOD
)
uri = child.attrib['URI']
if not uri.startswith('#'):
break
uri = uri.split('#')[1]
encrypted_key = OneLogin_Saml2_Utils.query(encrypted_assertion_nodes[0], './xenc:EncryptedKey[@Id=$tagid]', None, uri)
if encrypted_key:
want_authn_requests_signed = entity_descriptor_node.get('WantAuthnRequestsSigned', None)
name_id_format_nodes = OneLogin_Saml2_Utils.query(idp_descriptor_node, './md:NameIDFormat')
if len(name_id_format_nodes) > 0:
idp_name_id_format = OneLogin_Saml2_Utils.element_text(name_id_format_nodes[0])
sso_nodes = OneLogin_Saml2_Utils.query(
idp_descriptor_node,
"./md:SingleSignOnService[@Binding='%s']" % required_sso_binding
)
if len(sso_nodes) > 0:
idp_sso_url = sso_nodes[0].get('Location', None)
slo_nodes = OneLogin_Saml2_Utils.query(
idp_descriptor_node,
"./md:SingleLogoutService[@Binding='%s']" % required_slo_binding
)
if len(slo_nodes) > 0:
idp_slo_url = slo_nodes[0].get('Location', None)
signing_nodes = OneLogin_Saml2_Utils.query(idp_descriptor_node, "./md:KeyDescriptor[not(contains(@use, 'encryption'))]/ds:KeyInfo/ds:X509Data/ds:X509Certificate")
encryption_nodes = OneLogin_Saml2_Utils.query(idp_descriptor_node, "./md:KeyDescriptor[not(contains(@use, 'signing'))]/ds:KeyInfo/ds:X509Data/ds:X509Certificate")
if len(signing_nodes) > 0 or len(encryption_nodes) > 0:
certs = {}
if len(signing_nodes) > 0:
certs['signing'] = []
for cert_node in signing_nodes:
certs['signing'].append(''.join(OneLogin_Saml2_Utils.element_text(cert_node).split()))
if len(encryption_nodes) > 0:
keyinfo = OneLogin_Saml2_XML.query(encrypted_assertion_nodes[0], '//saml:EncryptedAssertion/xenc:EncryptedData/ds:KeyInfo')
if not keyinfo:
raise Exception('No KeyInfo present, invalid Assertion')
keyinfo = keyinfo[0]
children = keyinfo.getchildren()
if not children:
raise Exception('No child to KeyInfo, invalid Assertion')
for child in children:
if 'RetrievalMethod' in child.tag:
if child.attrib['Type'] != 'http://www.w3.org/2001/04/xmlenc#EncryptedKey':
raise Exception('Unsupported Retrieval Method found')
uri = child.attrib['URI']
if not uri.startswith('#'):
break
uri = uri.split('#')[1]
encrypted_key = OneLogin_Saml2_XML.query(encrypted_assertion_nodes[0], './xenc:EncryptedKey[@Id="' + uri + '"]')
if encrypted_key:
keyinfo.append(encrypted_key[0])
encrypted_data = encrypted_data_nodes[0]
decrypted = OneLogin_Saml2_Utils.decrypt_element(encrypted_data, key, debug)
xml.replace(encrypted_assertion_nodes[0], decrypted)
return xml
:raises: Exception if no private key available
:param xml: Encrypted Assertion
:type xml: Element
:returns: Decrypted Assertion
:rtype: Element
"""
key = self.__settings.get_sp_key()
debug = self.__settings.is_debug_active()
if not key:
raise Exception('No private key available, check settings')
encrypted_assertion_nodes = OneLogin_Saml2_XML.query(xml, '/samlp:Response/saml:EncryptedAssertion')
if encrypted_assertion_nodes:
encrypted_data_nodes = OneLogin_Saml2_XML.query(encrypted_assertion_nodes[0], '//saml:EncryptedAssertion/xenc:EncryptedData')
if encrypted_data_nodes:
keyinfo = OneLogin_Saml2_XML.query(encrypted_assertion_nodes[0], '//saml:EncryptedAssertion/xenc:EncryptedData/ds:KeyInfo')
if not keyinfo:
raise Exception('No KeyInfo present, invalid Assertion')
keyinfo = keyinfo[0]
children = keyinfo.getchildren()
if not children:
raise Exception('No child to KeyInfo, invalid Assertion')
for child in children:
if 'RetrievalMethod' in child.tag:
if child.attrib['Type'] != 'http://www.w3.org/2001/04/xmlenc#EncryptedKey':
raise Exception('Unsupported Retrieval Method found')
uri = child.attrib['URI']
if not uri.startswith('#'):
break
uri = uri.split('#')[1]
def _saml_login(self):
req = prepare_request_for_saml_toolkit(self.request)
auth = authenticate_saml(req, custom_base_path=Settings['saml_config_folder'])
return self.redirect(auth.login())
def _parse_certificates(self, certificate_nodes):
"""Parses XML nodes containing X.509 certificates into a list of strings
:param certificate_nodes: List of XML nodes containing X.509 certificates
:type certificate_nodes: List[defusedxml.lxml.RestrictedElement]
:return: List of string containing X.509 certificates
:rtype: List[string]
:raise: MetadataParsingError
"""
certificates = []
try:
for certificate_node in certificate_nodes:
certificates.append(''.join(OneLogin_Saml2_Utils.element_text(certificate_node).split()))
except XMLSyntaxError as exception:
raise SAMLMetadataParsingError(inner_exception=exception)
return certificates
if len(slo_nodes) > 0:
idp_slo_url = slo_nodes[0].get('Location', None)
signing_nodes = OneLogin_Saml2_Utils.query(idp_descriptor_node, "./md:KeyDescriptor[not(contains(@use, 'encryption'))]/ds:KeyInfo/ds:X509Data/ds:X509Certificate")
encryption_nodes = OneLogin_Saml2_Utils.query(idp_descriptor_node, "./md:KeyDescriptor[not(contains(@use, 'signing'))]/ds:KeyInfo/ds:X509Data/ds:X509Certificate")
if len(signing_nodes) > 0 or len(encryption_nodes) > 0:
certs = {}
if len(signing_nodes) > 0:
certs['signing'] = []
for cert_node in signing_nodes:
certs['signing'].append(''.join(OneLogin_Saml2_Utils.element_text(cert_node).split()))
if len(encryption_nodes) > 0:
certs['encryption'] = []
for cert_node in encryption_nodes:
certs['encryption'].append(''.join(OneLogin_Saml2_Utils.element_text(cert_node).split()))
data['idp'] = {}
if idp_entity_id is not None:
data['idp']['entityId'] = idp_entity_id
if idp_sso_url is not None:
data['idp']['singleSignOnService'] = {}
data['idp']['singleSignOnService']['url'] = idp_sso_url
data['idp']['singleSignOnService']['binding'] = required_sso_binding
if idp_slo_url is not None:
data['idp']['singleLogoutService'] = {}
data['idp']['singleLogoutService']['url'] = idp_slo_url
data['idp']['singleLogoutService']['binding'] = required_slo_binding
email=email,
active=True,
role=current_app.config.get('ONELOGIN_DEFAULT_ROLE')
# profile_picture=profile.get('thumbnailPhotoUrl')
)
db.session.add(user)
db.session.commit()
db.session.refresh(user)
# Tell Flask-Principal the identity changed
identity_changed.send(current_app._get_current_object(), identity=Identity(user.id))
login_user(user)
db.session.commit()
db.session.refresh(user)
self_url = OneLogin_Saml2_Utils.get_self_url(self.req)
if 'RelayState' in request.form and self_url != request.form['RelayState']:
return redirect(auth.redirect_to(request.form['RelayState']), code=302)
else:
return redirect(current_app.config.get('BASE_URL'), code=302)
else:
return dict(message='OneLogin authentication failed.'), 403
elif args['sls'] != None:
return dict(message='OneLogin SLS not implemented yet.'), 405
else:
return redirect(auth.login(return_to=return_to))