Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def kinit(self, principal, password):
# Create credentials cache in temporary directory
self.cache_dir = tempfile.mkdtemp()
ccache = 'FILE:{}/ccache'.format(self.cache_dir)
os.environ['KRB5CCNAME'] = ccache
store = {b'ccache': ccache.encode('UTF-8')}
# Acquire new credentials
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
acquire_credentials = gssapi.raw.acquire_cred_with_password(name, password.encode('ascii'))
self.credentials = acquire_credentials.creds
# Store credentials in the cache
gssapi.raw.store_cred_into(store, self.credentials, usage='initiate', overwrite=True)
raise IOError(
'krbContext is not running from a terminal. So, you '
'need to run kinit with your principal manually before'
' anything goes.')
# If there is no password specified via API call, prompt to
# enter one in order to continue to get credential. BUT, in
# some cases, blocking program and waiting for input of
# password is really bad, which may be only suitable for some
# simple use cases, for example, writing some scripts to test
# something that need Kerberos authentication. Anyway, whether
# it is really to enter a password from command line, it
# depends on concrete use cases totally.
password = getpass.getpass()
cred = gssapi.raw.acquire_cred_with_password(
self._cleaned_options['principal'], password.encode('utf-8'))
ccache = self._cleaned_options['ccache']
if ccache == DEFAULT_CCACHE:
gssapi.raw.store_cred(
cred.creds,
usage='initiate', overwrite=True, set_default=True,
)
else:
gssapi.raw.store_cred_into({'ccache': ccache},
cred.creds,
usage='initiate',
overwrite=True)
# raises ExpiredCredentialsError if it has expired
cred.lifetime
except gssapi.raw.GSSError:
# we can't acquire the cred if no password was supplied
if password is None:
raise
cred = None
elif username is None or password is None:
raise ValueError("Can only use implicit credentials with kerberos "
"authentication")
if cred is None:
# error when trying to access the existing cache, get our own
# credentials with the password specified
b_password = to_bytes(password)
cred = gssapi.raw.acquire_cred_with_password(username, b_password,
usage='initiate',
mechs=[mech])
cred = cred.creds
flags = gssapi.RequirementFlag.mutual_authentication | \
gssapi.RequirementFlag.out_of_sequence_detection
if delegate:
flags |= gssapi.RequirementFlag.delegate_to_peer
if wrap_required:
flags |= gssapi.RequirementFlag.confidentiality
context = gssapi.SecurityContext(name=server_name,
creds=cred,
usage='initiate',
mech=mech,
flags=flags,
kerb_mech = gssapi.OID.from_int_seq(kerb_oid)
if mech == kerb_mech:
try:
cred = gssapi.Credentials(name=user, usage='initiate',
mechs=[mech])
# we successfully got the Kerberos credential from the cache
# and don't need to acquire with the password
acquire_with_pass = False
except gssapi.exceptions.GSSError:
pass
if acquire_with_pass:
# error when trying to access the existing cache, get our own
# credentials with the password specified
b_password = password.encode('utf-8')
cred = acquire_cred_with_password(user, b_password,
usage='initiate',
mechs=[mech])
cred = cred.creds
flags = gssapi.RequirementFlag.confidentiality | \
gssapi.RequirementFlag.mutual_authentication | \
gssapi.RequirementFlag.integrity | \
gssapi.RequirementFlag.out_of_sequence_detection
context = gssapi.SecurityContext(name=server_name,
creds=cred,
usage='initiate',
mech=mech,
flags=flags)
return context