Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def kinit(self, principal, password):
# Create credentials cache in temporary directory
self.cache_dir = tempfile.mkdtemp()
ccache = 'FILE:{}/ccache'.format(self.cache_dir)
os.environ['KRB5CCNAME'] = ccache
store = {b'ccache': ccache.encode('UTF-8')}
# Acquire new credentials
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
acquire_credentials = gssapi.raw.acquire_cred_with_password(name, password.encode('ascii'))
self.credentials = acquire_credentials.creds
# Store credentials in the cache
gssapi.raw.store_cred_into(store, self.credentials, usage='initiate', overwrite=True)
server_creds = gssapi.creds.Credentials(
usage='accept',
name=gssapi.names.Name('HTTP/%s'% const.FQDN))
context = gssapi.sec_contexts.SecurityContext(creds=server_creds)
if not req.auth.startswith("Negotiate "):
raise falcon.HTTPBadRequest("Bad request", "Bad header: %s" % req.auth)
token = ''.join(req.auth.split()[1:])
try:
context.step(b64decode(token))
except binascii.Error: # base64 errors
raise falcon.HTTPBadRequest("Bad request", "Malformed token")
except gssapi.raw.exceptions.BadMechanismError:
raise falcon.HTTPBadRequest("Bad request", "Unsupported authentication mechanism (NTLM?) was offered. Please make sure you've logged into the computer with domain user account. The web interface should not prompt for username or password.")
try:
username, domain = str(context.initiator_name).split("@")
except AttributeError: # TODO: Better exception
raise falcon.HTTPForbidden("Failed to determine username, are you trying to log in with correct domain account?")
if domain.lower() != const.DOMAIN.lower():
raise falcon.HTTPForbidden("Forbidden",
"Invalid realm supplied")
if username.endswith("$") and optional:
# Extract machine hostname
# TODO: Assert LDAP group membership
req.context["machine"] = username[:-1].lower()
req.context["user"] = None
def get_kerberos_principal():
"""
Use gssapi to get the current kerberos principal.
This will be used as the requester for some tools when creating tickets.
:return: The kerberos principal.
"""
try:
return str(gssapi.Credentials(usage='initiate').name).lower()
except gssapi.raw.misc.GSSError:
return None
# something that need Kerberos authentication. Anyway, whether
# it is really to enter a password from command line, it
# depends on concrete use cases totally.
password = getpass.getpass()
cred = gssapi.raw.acquire_cred_with_password(
self._cleaned_options['principal'], password.encode('utf-8'))
ccache = self._cleaned_options['ccache']
if ccache == DEFAULT_CCACHE:
gssapi.raw.store_cred(
cred.creds,
usage='initiate', overwrite=True, set_default=True,
)
else:
gssapi.raw.store_cred_into({'ccache': ccache},
cred.creds,
usage='initiate',
overwrite=True)
# If there is no password specified via API call, prompt to
# enter one in order to continue to get credential. BUT, in
# some cases, blocking program and waiting for input of
# password is really bad, which may be only suitable for some
# simple use cases, for example, writing some scripts to test
# something that need Kerberos authentication. Anyway, whether
# it is really to enter a password from command line, it
# depends on concrete use cases totally.
password = getpass.getpass()
cred = gssapi.raw.acquire_cred_with_password(
self._cleaned_options['principal'], password.encode('utf-8'))
ccache = self._cleaned_options['ccache']
if ccache == DEFAULT_CCACHE:
gssapi.raw.store_cred(
cred.creds,
usage='initiate', overwrite=True, set_default=True,
)
else:
gssapi.raw.store_cred_into({'ccache': ccache},
cred.creds,
usage='initiate',
overwrite=True)
def check_creds(options, realm_name):
# Check if ccache is available
default_cred = None
try:
root_logger.debug('KRB5CCNAME set to %s' %
os.environ.get('KRB5CCNAME', None))
# get default creds, will raise if none found
default_cred = gssapi.creds.Credentials()
principal = str(default_cred.name)
except gssapi.raw.misc.GSSError as e:
root_logger.debug('Failed to find default ccache: %s' % e)
principal = None
# Check if the principal matches the requested one (if any)
if principal is not None and options.principal is not None:
op = options.principal
if op.find('@') == -1:
op = '%s@%s' % (op, realm_name)
if principal != op:
root_logger.debug('Specified principal %s does not match '
'available credentials (%s)' %
(options.principal, principal))
principal = None
if principal is None:
(ccache_fd, ccache_name) = tempfile.mkstemp()
name_type=gssapi.NameType.hostbased_service)
# first try and get the cred from the existing cache, if that fails
# then get a new ticket with the password (if specified). The cache
# can only be used for Kerberos, NTLM/SPNEGO must have acquire the
# cred with a pass
cred = None
kerb_oid = GSSAPIContext._AUTH_PROVIDERS['kerberos']
kerb_mech = gssapi.OID.from_int_seq(kerb_oid)
if mech == kerb_mech:
try:
cred = gssapi.Credentials(name=username, usage='initiate',
mechs=[mech])
# raises ExpiredCredentialsError if it has expired
cred.lifetime
except gssapi.raw.GSSError:
# we can't acquire the cred if no password was supplied
if password is None:
raise
cred = None
elif username is None or password is None:
raise ValueError("Can only use implicit credentials with kerberos "
"authentication")
if cred is None:
# error when trying to access the existing cache, get our own
# credentials with the password specified
b_password = to_bytes(password)
cred = gssapi.raw.acquire_cred_with_password(username, b_password,
usage='initiate',
mechs=[mech])
cred = cred.creds
"""Handle ticket created event.
"""
princ = os.path.basename(tkt_file)
if not _is_valid(princ):
return
infofile = os.path.join(tkt_info_dir, princ)
_LOGGER.info('Processing: %s', princ)
try:
os.environ['KRB5CCNAME'] = os.path.join(tkt_spool_dir, princ)
creds = gssapi.creds.Credentials(usage='initiate')
with open(infofile, 'wb') as f:
f.write(json.dumps({
'expires_at': int(time.time()) + creds.lifetime
}).encode())
except gssapi.raw.GSSError as gss_err:
fs.rm_safe(infofile)
finally:
del os.environ['KRB5CCNAME']
def check_creds(options, realm_name):
# Check if ccache is available
default_cred = None
try:
logger.debug('KRB5CCNAME set to %s',
os.environ.get('KRB5CCNAME', None))
# get default creds, will raise if none found
default_cred = gssapi.creds.Credentials()
principal = str(default_cred.name)
except gssapi.raw.misc.GSSError as e:
logger.debug('Failed to find default ccache: %s', e)
principal = None
# Check if the principal matches the requested one (if any)
if principal is not None and options.principal is not None:
op = options.principal
if op.find('@') == -1:
op = '%s@%s' % (op, realm_name)
if principal != op:
logger.debug('Specified principal %s does not match '
'available credentials (%s)',
options.principal, principal)
principal = None
if principal is None:
(ccache_fd, ccache_name) = tempfile.mkstemp()