Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def do_logout_service(request, data, binding, config_loader_path=None, next_page=None,
logout_error_template='djangosaml2/logout_error.html'):
"""SAML Logout Response endpoint
The IdP will send the logout response to this view,
which will process it with pysaml2 help and log the user
out.
Note that the IdP can request a logout even when
we didn't initiate the process as a single logout
request started by another SP.
"""
logger.debug('Logout service started')
conf = get_config(config_loader_path, request)
state = StateCache(request.session)
client = Saml2Client(conf, state_cache=state,
identity_cache=IdentityCache(request.session))
if 'SAMLResponse' in data: # we started the logout
logger.debug('Receiving a logout response from the IdP')
response = client.parse_logout_request_response(data['SAMLResponse'], binding)
state.sync()
return finish_logout(request, response, next_page=next_page)
elif 'SAMLRequest' in data: # logout started by the IdP
logger.debug('Receiving a logout request from the IdP')
subject_id = _get_subject_id(request.session)
if subject_id is None:
logger.warning(
def do_logout_service(request, data, binding, config_loader_path=None, next_page=None,
logout_error_template='djangosaml2/logout_error.html'):
"""SAML Logout Response endpoint
The IdP will send the logout response to this view,
which will process it with pysaml2 help and log the user
out.
Note that the IdP can request a logout even when
we didn't initiate the process as a single logout
request started by another SP.
"""
logger.debug('Logout service started')
conf = get_config(config_loader_path, request)
state = StateCache(request.session)
client = Saml2Client(conf, state_cache=state,
identity_cache=IdentityCache(request.session))
if 'SAMLResponse' in data: # we started the logout
logger.debug('Receiving a logout response from the IdP')
response = client.parse_logout_request_response(data['SAMLResponse'], binding)
state.sync()
return finish_logout(request, response, next_page=next_page)
elif 'SAMLRequest' in data: # logout started by the IdP
logger.debug('Receiving a logout request from the IdP')
subject_id = _get_subject_id(request.session)
if subject_id is None:
logger.warning(
def echo_attributes(request,
config_loader_path=None,
template='djangosaml2/echo_attributes.html'):
"""Example view that echo the SAML attributes of an user"""
state = StateCache(request.session)
conf = get_config(config_loader_path, request)
client = Saml2Client(conf, state_cache=state,
identity_cache=IdentityCache(request.session))
subject_id = _get_subject_id(request.session)
try:
identity = client.users.get_identity(subject_id,
check_not_on_or_after=False)
except AttributeError:
return HttpResponse("No active SAML identity found. Are you sure you have logged in via SAML?")
return render(request, template, {'attributes': identity[0]}, using='django')
def post(self, request):
if not self.request.user.is_anonymous:
error_message = _('This endpoint is for anonymous users only.')
return JsonResponse({'error_message': error_message}, status=400)
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)
idp = serializer.validated_data.get('idp')
conf = get_config(request=request)
# ensure our selected binding is supported by the IDP
supported_bindings = utils.get_idp_sso_supported_bindings(idp, config=conf)
default_binding = settings.WALDUR_AUTH_SAML2.get('DEFAULT_BINDING')
if default_binding in supported_bindings:
binding = default_binding
elif BINDING_HTTP_POST in supported_bindings:
binding = BINDING_HTTP_POST
elif BINDING_HTTP_REDIRECT in supported_bindings:
binding = BINDING_HTTP_REDIRECT
else:
error_message = _('Identity provider does not support available bindings.')
return JsonResponse({'error_message': error_message}, status=400)
client = Saml2Client(conf)
def post(self,
request,
config_loader_path=None,
attribute_mapping=None,
create_unknown_user=None):
"""
SAML Authorization Response endpoint
"""
attribute_mapping = attribute_mapping or get_custom_setting('SAML_ATTRIBUTE_MAPPING', {'uid': ('username', )})
create_unknown_user = create_unknown_user or get_custom_setting('SAML_CREATE_UNKNOWN_USER', True)
conf = get_config(config_loader_path, request)
try:
xmlstr = request.POST['SAMLResponse']
except KeyError:
logger.warning('Missing "SAMLResponse" parameter in POST data.')
raise SuspiciousOperation
client = Saml2Client(conf, identity_cache=IdentityCache(self.request.session))
oq_cache = OutstandingQueriesCache(self.request.session)
outstanding_queries = oq_cache.outstanding_queries()
try:
response = client.parse_authn_request_response(xmlstr, BINDING_HTTP_POST, outstanding_queries)
except (StatusError, ToEarly) as e:
logger.exception("Error processing SAML Assertion.")
return fail_acs_response(request, exception=e)
def logout(self, request, data, binding):
conf = get_config(request=request)
state = StateCache(request.session)
client = Saml2Client(
conf, state_cache=state, identity_cache=IdentityCache(request.session)
)
if 'SAMLResponse' in data:
# Logout started by us
client.parse_logout_request_response(data['SAMLResponse'], binding)
http_response = logout_completed()
else:
# Logout started by IdP
subject_id = _get_subject_id(request.session)
if subject_id is None:
http_response = logout_completed()
else:
def logout(request, config_loader_path=None):
"""SAML Logout Request initiator
This view initiates the SAML2 Logout request
using the pysaml2 library to create the LogoutRequest.
"""
state = StateCache(request.session)
conf = get_config(config_loader_path, request)
client = Saml2Client(conf, state_cache=state,
identity_cache=IdentityCache(request.session))
subject_id = _get_subject_id(request.session)
if subject_id is None:
logger.warning(
'The session does not contain the subject id for user %s',
request.user)
result = client.global_logout(subject_id)
state.sync()
if not result:
logger.error("Looks like the user %s is not logged in any IdP/AA", subject_id)
return HttpResponseBadRequest("You are not logged in any IdP/AA")
# SAML_IGNORE_AUTHENTICATED_USERS_ON_LOGIN setting. If that setting
# is True (default value) we will redirect him to the came_from view.
# Otherwise, we will show an (configurable) authorization error.
if request.user.is_authenticated:
redirect_authenticated_user = getattr(settings, 'SAML_IGNORE_AUTHENTICATED_USERS_ON_LOGIN', True)
if redirect_authenticated_user:
return HttpResponseRedirect(came_from)
else:
logger.debug('User is already logged in')
return render(request, authorization_error_template, {
'came_from': came_from,
})
selected_idp = request.GET.get('idp', None)
try:
conf = get_config(config_loader_path, request)
except SourceNotFound as excp:
msg = ('Error, IdP EntityID was not found '
'in metadata: {}')
logger.exception(msg.format(excp))
return HttpResponse(msg.format(('Please contact '
'technical support.')),
status=500)
kwargs = {}
# pysaml needs a string otherwise: "cannot serialize True (type bool)"
if getattr(conf, '_sp_force_authn', False):
kwargs['force_authn'] = "true"
if getattr(conf, '_sp_allow_create', False):
kwargs['allow_create'] = "true"
# is a embedded wayf needed?
def assertion_consumer_service(request,
config_loader_path=None,
attribute_mapping=None,
create_unknown_user=None):
"""SAML Authorization Response endpoint
The IdP will send its response to this view, which
will process it with pysaml2 help and log the user
in using the custom Authorization backend
djangosaml2.backends.Saml2Backend that should be
enabled in the settings.py
"""
attribute_mapping = attribute_mapping or get_custom_setting('SAML_ATTRIBUTE_MAPPING', {'uid': ('username', )})
create_unknown_user = create_unknown_user or get_custom_setting('SAML_CREATE_UNKNOWN_USER', True)
conf = get_config(config_loader_path, request)
try:
xmlstr = request.POST['SAMLResponse']
except KeyError:
logger.warning('Missing "SAMLResponse" parameter in POST data.')
raise SuspiciousOperation
client = Saml2Client(conf, identity_cache=IdentityCache(request.session))
oq_cache = OutstandingQueriesCache(request.session)
outstanding_queries = oq_cache.outstanding_queries()
try:
response = client.parse_authn_request_response(xmlstr, BINDING_HTTP_POST, outstanding_queries)
except (StatusError, ToEarly):
logger.exception("Error processing SAML Assertion.")
return fail_acs_response(request)
if not request.user.is_anonymous():
try:
redirect_authenticated_user = settings.SAML_IGNORE_AUTHENTICATED_USERS_ON_LOGIN
except AttributeError:
redirect_authenticated_user = True
if redirect_authenticated_user:
return HttpResponseRedirect(came_from)
else:
logger.debug('User is already logged in')
return render(request, authorization_error_template, {
'came_from': came_from,
})
selected_idp = request.GET.get('idp', None)
conf = get_config(config_loader_path, request)
# is a embedded wayf needed?
idps = available_idps(conf)
if selected_idp is None and len(idps) > 1:
logger.debug('A discovery process is needed')
return render(request, wayf_template, {
'available_idps': idps.items(),
'came_from': came_from,
})
# choose a binding to try first
sign_requests = getattr(conf, '_sp_authn_requests_signed', False)
binding = BINDING_HTTP_POST if sign_requests else BINDING_HTTP_REDIRECT
logger.debug('Trying binding %s for IDP %s', binding, selected_idp)
# ensure our selected binding is supported by the IDP