Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def kerberos_authenticate(resource, req, resp, *args, **kwargs):
# Try pre-emptive authentication
if not req.auth:
if optional:
req.context["user"] = None
return func(resource, req, resp, *args, **kwargs)
logger.debug("No Kerberos ticket offered while attempting to access %s from %s",
req.env["PATH_INFO"], req.context.get("remote_addr"))
raise falcon.HTTPUnauthorized("Unauthorized",
"No Kerberos ticket offered, are you sure you've logged in with domain user account?",
["Negotiate"])
os.environ["KRB5_KTNAME"] = config.KERBEROS_KEYTAB
server_creds = gssapi.creds.Credentials(
usage='accept',
name=gssapi.names.Name('HTTP/%s'% const.FQDN))
context = gssapi.sec_contexts.SecurityContext(creds=server_creds)
if not req.auth.startswith("Negotiate "):
raise falcon.HTTPBadRequest("Bad request", "Bad header: %s" % req.auth)
token = ''.join(req.auth.split()[1:])
try:
context.step(b64decode(token))
except binascii.Error: # base64 errors
raise falcon.HTTPBadRequest("Bad request", "Malformed token")
except gssapi.raw.exceptions.BadMechanismError:
raise falcon.HTTPBadRequest("Bad request", "Unsupported authentication mechanism (NTLM?) was offered. Please make sure you've logged into the computer with domain user account. The web interface should not prompt for username or password.")
["Negotiate"])
else:
logger.debug("No credentials offered while attempting to access %s from %s",
req.env["PATH_INFO"], req.context.get("remote_addr"))
raise falcon.HTTPUnauthorized("Unauthorized", "Please authenticate", ("Basic",))
if kerberized:
if not req.auth.startswith("Negotiate "):
raise falcon.HTTPUnauthorized("Unauthorized",
"Bad header, expected Negotiate",
["Negotiate"])
os.environ["KRB5_KTNAME"] = config.KERBEROS_KEYTAB
try:
server_creds = gssapi.creds.Credentials(
usage='accept',
name=gssapi.names.Name('HTTP/%s'% const.FQDN))
except gssapi.raw.exceptions.BadNameError:
logger.error("Failed initialize HTTP service principal, possibly bad permissions for %s or /etc/krb5.conf" %
config.KERBEROS_KEYTAB)
raise
context = gssapi.sec_contexts.SecurityContext(creds=server_creds)
token = ''.join(req.auth.split()[1:])
try:
context.step(b64decode(token))
except binascii.Error: # base64 errors
raise falcon.HTTPBadRequest("Bad request", "Malformed token")
except gssapi.raw.exceptions.BadMechanismError:
import falcon
import gssapi
import json
import logging
import os
import re
import socket
import unicodedata
from identidude import config
from datetime import datetime, date
logger = logging.getLogger(__name__)
# http://firstyear.id.au/blog/html/2015/11/26/python_gssapi_with_flask_and_s4u2proxy.html
os.environ["KRB5_KTNAME"] = "FILE:/etc/identidude/server.keytab"
server_creds = gssapi.creds.Credentials(
usage='accept',
name=gssapi.names.Name('HTTP/%s'% (socket.gethostname())))
def apidoc(cls):
"""
Automagically document resource classes based on validate(), required(), etc decorators
"""
@serialize
def apidoc_on_options(resource, req, resp, *args, **kwargs):
d = {}
for key in dir(resource):
if key == "on_options": continue
if re.match("on_\w+", key):
func = getattr(resource, key)
d[key[3:]] = getattr(func, "_apidoc", None)
d[key[3:]]["description"] = (getattr(func, "__doc__") or u"").strip()
def init_with_keytab(self):
"""Initialize credential cache with keytab"""
creds_opts = {
'usage': 'initiate',
'name': self._cleaned_options['principal'],
}
store = {}
if self._cleaned_options['keytab'] != DEFAULT_KEYTAB:
store['client_keytab'] = self._cleaned_options['keytab']
if self._cleaned_options['ccache'] != DEFAULT_CCACHE:
store['ccache'] = self._cleaned_options['ccache']
if store:
creds_opts['store'] = store
creds = gssapi.creds.Credentials(**creds_opts)
try:
creds.lifetime
except gssapi.exceptions.ExpiredCredentialsError:
new_creds_opts = copy.deepcopy(creds_opts)
# Get new credential and put it into a temporary ccache
temp_directory = tempfile.mkdtemp('-krbcontext')
temp_ccache = os.path.join(temp_directory, 'ccache')
try:
new_creds_opts.setdefault('store', {})['ccache'] = temp_ccache
creds = gssapi.creds.Credentials(**new_creds_opts)
# Then, store new credential back to original specified ccache,
# whatever a given ccache file or the default one.
_store = None
# If default ccache is used, no need to specify ccache in
# store parameter passed to ``creds.store``.
if self._cleaned_options['ccache'] != DEFAULT_CCACHE:
res = rcred_cred_store.add_cred_from(store, self, name, mech,
usage, init_lifetime,
accept_lifetime)
elif impersonator is not None:
if rcred_s4u is None:
raise NotImplementedError("Your GSSAPI implementation does "
"not have support for S4U")
res = rcred_s4u.add_cred_impersonate_name(self, impersonator,
name, mech, usage,
init_lifetime,
accept_lifetime)
else:
res = rcreds.add_cred(self, name, mech, usage, init_lifetime,
accept_lifetime)
return Credentials(res.creds)
def handle_tkt_event(tkt_file):
"""Handle ticket created event.
"""
princ = os.path.basename(tkt_file)
if not _is_valid(princ):
return
infofile = os.path.join(tkt_info_dir, princ)
_LOGGER.info('Processing: %s', princ)
try:
os.environ['KRB5CCNAME'] = os.path.join(tkt_spool_dir, princ)
creds = gssapi.creds.Credentials(usage='initiate')
with open(infofile, 'wb') as f:
f.write(json.dumps({
'expires_at': int(time.time()) + creds.lifetime
}).encode())
except gssapi.raw.GSSError as gss_err:
fs.rm_safe(infofile)
finally:
del os.environ['KRB5CCNAME']