Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
req.context["user"] = None
return func(resource, req, resp, *args, **kwargs)
logger.debug("No Kerberos ticket offered while attempting to access %s from %s",
req.env["PATH_INFO"], req.context.get("remote_addr"))
raise falcon.HTTPUnauthorized("Unauthorized",
"No Kerberos ticket offered, are you sure you've logged in with domain user account?",
["Negotiate"])
os.environ["KRB5_KTNAME"] = config.KERBEROS_KEYTAB
server_creds = gssapi.creds.Credentials(
usage='accept',
name=gssapi.names.Name('HTTP/%s'% const.FQDN))
context = gssapi.sec_contexts.SecurityContext(creds=server_creds)
if not req.auth.startswith("Negotiate "):
raise falcon.HTTPBadRequest("Bad request", "Bad header: %s" % req.auth)
token = ''.join(req.auth.split()[1:])
try:
context.step(b64decode(token))
except binascii.Error: # base64 errors
raise falcon.HTTPBadRequest("Bad request", "Malformed token")
except gssapi.raw.exceptions.BadMechanismError:
raise falcon.HTTPBadRequest("Bad request", "Unsupported authentication mechanism (NTLM?) was offered. Please make sure you've logged into the computer with domain user account. The web interface should not prompt for username or password.")
try:
username, domain = str(context.initiator_name).split("@")
except AttributeError: # TODO: Better exception
def kerberos_authenticate(resource, req, resp, *args, **kwargs):
if not req.auth:
logger.debug(u"No Kerberos ticket offered while attempting to access %s from %s",
req.env["PATH_INFO"], req.context.get("remote_addr"))
raise falcon.HTTPUnauthorized("Unauthorized",
"No Kerberos ticket offered, are you sure you've logged in with domain user account?",
["Negotiate"])
context = gssapi.sec_contexts.SecurityContext(creds=server_creds)
token = ''.join(req.auth.split()[1:])
context.step(base64.b64decode(token))
if delegate_credentials:
if not context.delegated_creds:
logger.debug(u"No credentials delegated for %s from %s",
req.env["PATH_INFO"], req.context.get("remote_addr"))
raise falcon.HTTPForbidden("Error", "No credential delegation enabled")
CCACHE = 'MEMORY:ccache_rest389_%s' % context.delegated_creds.name
store = {'ccache': CCACHE}
context.delegated_creds.store(store, overwrite=True)
os.environ['KRB5CCNAME'] = CCACHE # This will definitely break multithreading
req.context["user"], req.context["realm"] = repr(context.initiator_name).split("@")
req.context["remote_addr"] = "bla"
retval = func(resource, req, resp, *args, **kwargs)
del(os.environ['KRB5CCNAME'])
raise falcon.HTTPUnauthorized("Unauthorized",
"Bad header, expected Negotiate",
["Negotiate"])
os.environ["KRB5_KTNAME"] = config.KERBEROS_KEYTAB
try:
server_creds = gssapi.creds.Credentials(
usage='accept',
name=gssapi.names.Name('HTTP/%s'% const.FQDN))
except gssapi.raw.exceptions.BadNameError:
logger.error("Failed initialize HTTP service principal, possibly bad permissions for %s or /etc/krb5.conf" %
config.KERBEROS_KEYTAB)
raise
context = gssapi.sec_contexts.SecurityContext(creds=server_creds)
token = ''.join(req.auth.split()[1:])
try:
context.step(b64decode(token))
except binascii.Error: # base64 errors
raise falcon.HTTPBadRequest("Bad request", "Malformed token")
except gssapi.raw.exceptions.BadMechanismError:
raise falcon.HTTPBadRequest("Bad request", "Unsupported authentication mechanism (NTLM?) was offered. Please make sure you've logged into the computer with domain user account. The web interface should not prompt for username or password.")
try:
username, realm = str(context.initiator_name).split("@")
except AttributeError: # TODO: Better exception
raise falcon.HTTPForbidden("Failed to determine username, are you trying to log in with correct domain account?")
if realm != config.KERBEROS_REALM:
def __new__(cls, base=None, token=None,
name=None, creds=None, lifetime=None, flags=None,
mech=None, channel_bindings=None, usage=None):
if token is not None:
base = rsec_contexts.import_sec_context(token)
return super(SecurityContext, cls).__new__(cls, base)