Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
in using the custom Authorization backend
djangosaml2.backends.Saml2Backend that should be
enabled in the settings.py
"""
attribute_mapping = attribute_mapping or get_custom_setting('SAML_ATTRIBUTE_MAPPING', {'uid': ('username', )})
create_unknown_user = create_unknown_user or get_custom_setting('SAML_CREATE_UNKNOWN_USER', True)
conf = get_config(config_loader_path, request)
try:
xmlstr = request.POST['SAMLResponse']
except KeyError:
logger.warning('Missing "SAMLResponse" parameter in POST data.')
raise SuspiciousOperation
client = Saml2Client(conf, identity_cache=IdentityCache(request.session))
oq_cache = OutstandingQueriesCache(request.session)
outstanding_queries = oq_cache.outstanding_queries()
try:
response = client.parse_authn_request_response(xmlstr, BINDING_HTTP_POST, outstanding_queries)
except (StatusError, ToEarly):
logger.exception("Error processing SAML Assertion.")
return fail_acs_response(request)
except ResponseLifetimeExceed:
logger.info("SAML Assertion is no longer valid. Possibly caused by network delay or replay attack.", exc_info=True)
return fail_acs_response(request)
except SignatureError:
logger.info("Invalid or malformed SAML Assertion.", exc_info=True)
return fail_acs_response(request)
except StatusAuthnFailed:
logger.info("Authentication denied for user by IdP.", exc_info=True)
return fail_acs_response(request)
# use the html provided by pysaml2 if no template was specified or it didn't exist
try:
session_id, result = client.prepare_for_authenticate(
entityid=selected_idp, relay_state=came_from,
binding=binding)
except TypeError as e:
logger.error('Unable to know which IdP to use')
return HttpResponse(text_type(e))
else:
http_response = HttpResponse(result['data'])
else:
raise UnsupportedBinding('Unsupported binding: %s', binding)
# success, so save the session ID and return our response
logger.debug('Saving the session_id in the OutstandingQueries cache')
oq_cache = OutstandingQueriesCache(request.session)
oq_cache.set(session_id, came_from)
return http_response
# use the html provided by pysaml2 if no template was specified or it didn't exist
try:
session_id, result = client.prepare_for_authenticate(
entityid=selected_idp, relay_state=came_from,
binding=binding)
except TypeError as e:
logger.error('Unable to know which IdP to use')
return HttpResponse(str(e))
else:
http_response = HttpResponse(result['data'])
else:
raise UnsupportedBinding('Unsupported binding: %s', binding)
# success, so save the session ID and return our response
logger.debug('Saving the session_id in the OutstandingQueries cache')
oq_cache = OutstandingQueriesCache(request.session)
oq_cache.set(session_id, came_from)
return http_response
djangosaml2.backends.Saml2Backend that should be
enabled in the settings.py
"""
attribute_mapping = attribute_mapping or get_custom_setting('SAML_ATTRIBUTE_MAPPING', {'uid': ('username', )})
create_unknown_user = create_unknown_user if create_unknown_user is not None else \
get_custom_setting('SAML_CREATE_UNKNOWN_USER', True)
conf = get_config(config_loader_path, request)
try:
xmlstr = request.POST['SAMLResponse']
except KeyError:
logger.warning('Missing "SAMLResponse" parameter in POST data.')
raise SuspiciousOperation
client = Saml2Client(conf, identity_cache=IdentityCache(request.session))
oq_cache = OutstandingQueriesCache(request.session)
outstanding_queries = oq_cache.outstanding_queries()
try:
response = client.parse_authn_request_response(xmlstr, BINDING_HTTP_POST, outstanding_queries)
except (StatusError, ToEarly):
logger.exception("Error processing SAML Assertion.")
return fail_acs_response(request)
except ResponseLifetimeExceed:
logger.info("SAML Assertion is no longer valid. Possibly caused by network delay or replay attack.", exc_info=True)
return fail_acs_response(request)
except SignatureError:
logger.info("Invalid or malformed SAML Assertion.", exc_info=True)
return fail_acs_response(request)
except StatusAuthnFailed:
logger.info("Authentication denied for user by IdP.", exc_info=True)
return fail_acs_response(request)
# use the html provided by pysaml2 if no template was specified or it didn't exist
try:
session_id, result = client.prepare_for_authenticate(
entityid=selected_idp, relay_state=came_from,
binding=binding)
except TypeError as e:
logger.error('Unable to know which IdP to use')
return HttpResponse(text_type(e))
else:
http_response = HttpResponse(result['data'])
else:
raise UnsupportedBinding('Unsupported binding: %s', binding)
# success, so save the session ID and return our response
logger.debug('Saving the session_id in the OutstandingQueries cache')
oq_cache = OutstandingQueriesCache(request.session)
oq_cache.set(session_id, came_from)
return http_response
create_unknown_user=None):
"""
SAML Authorization Response endpoint
"""
attribute_mapping = attribute_mapping or get_custom_setting('SAML_ATTRIBUTE_MAPPING', {'uid': ('username', )})
create_unknown_user = create_unknown_user or get_custom_setting('SAML_CREATE_UNKNOWN_USER', True)
conf = get_config(config_loader_path, request)
try:
xmlstr = request.POST['SAMLResponse']
except KeyError:
logger.warning('Missing "SAMLResponse" parameter in POST data.')
raise SuspiciousOperation
client = Saml2Client(conf, identity_cache=IdentityCache(self.request.session))
oq_cache = OutstandingQueriesCache(self.request.session)
outstanding_queries = oq_cache.outstanding_queries()
try:
response = client.parse_authn_request_response(xmlstr, BINDING_HTTP_POST, outstanding_queries)
except (StatusError, ToEarly) as e:
logger.exception("Error processing SAML Assertion.")
return fail_acs_response(request, exception=e)
except ResponseLifetimeExceed as e:
logger.info("SAML Assertion is no longer valid. Possibly caused by network delay or replay attack.", exc_info=True)
return fail_acs_response(request, exception=e)
except SignatureError as e:
logger.info("Invalid or malformed SAML Assertion.", exc_info=True)
return fail_acs_response(request, exception=e)
except StatusAuthnFailed as e:
logger.info("Authentication denied for user by IdP.", exc_info=True)
return fail_acs_response(request, exception=e)
def post(self, request):
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)
attribute_mapping = get_custom_setting(
'SAML_ATTRIBUTE_MAPPING', {'uid': ('username',)}
)
create_unknown_user = get_custom_setting('SAML_CREATE_UNKNOWN_USER', True)
conf = get_config(request=request)
client = Saml2Client(conf, identity_cache=IdentityCache(request.session))
oq_cache = OutstandingQueriesCache(request.session)
outstanding_queries = oq_cache.outstanding_queries()
xmlstr = serializer.validated_data['SAMLResponse']
# process the authentication response
try:
response = client.parse_authn_request_response(
xmlstr, BINDING_HTTP_POST, outstanding_queries
)
except Exception as e:
if isinstance(e, StatusRequestDenied):
return login_failed(
_(
'Authentication request has been denied by identity provider. '
'Please check your credentials.'
)
location = client.sso_location(idp, binding)
except TypeError:
error_message = _('Invalid identity provider specified.')
return JsonResponse({'error_message': error_message}, status=400)
session_id, request_xml = client.create_authn_request(
location, binding=binding, **kwargs
)
data = {
'binding': 'post',
'url': location,
'request': str(base64.b64encode(request_xml.encode('UTF-8')), 'utf-8'),
}
# save session_id
oq_cache = OutstandingQueriesCache(request.session)
oq_cache.set(session_id, '')
return JsonResponse(data)