Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init_in_default_ccache_without_original_krb5ccname_is_set(
self, Credentials):
type(Credentials.return_value).lifetime = PropertyMock(
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
with krbContext(using_keytab=True,
principal='app/hostname@EXAMPLE.COM'):
self.assertNotIn('KRB5CCNAME', os.environ)
# Originally, no KRB5CCNAME is set, it should be cleaned after exit.
self.assertNotIn('KRB5CCNAME', os.environ)
def test_init_with_default_keytab(self, Credentials):
type(Credentials.return_value).lifetime = PropertyMock(
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
with krbContext(using_keytab=True,
principal='app/hostname@EXAMPLE.COM',
ccache_file='/tmp/my_cc'):
self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME'])
def test_init_with_entering_password_but_not_in_atty(self,
isatty,
Credentials):
type(Credentials.return_value).lifetime = PropertyMock(
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
context = krbContext(using_keytab=False, principal=self.principal)
self.assertRaises(IOError, context.init_with_password)
context = krbContext(using_keytab=False,
principal=self.principal,
password='')
self.assertRaises(IOError, context.init_with_password)
def test_original_ccache_should_be_restored(self, Credentials):
type(Credentials.return_value).lifetime = PropertyMock(
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
with krbContext(using_keytab=True,
principal='app/hostname@EXAMPLE.COM',
ccache_file='/tmp/app_pid_cc'):
# Inside context, given ccache should be used.
self.assertEqual('/tmp/app_pid_cc', os.environ['KRB5CCNAME'])
self.assertIn('KRB5CCNAME', os.environ)
self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME'])
def test_init_in_default_ccache_and_original_krb5ccname_is_set(
self, Credentials):
type(Credentials.return_value).lifetime = PropertyMock(
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
with krbContext(using_keytab=True,
principal='app/hostname@EXAMPLE.COM'):
self.assertNotIn('KRB5CCNAME', os.environ)
self.assertIn('KRB5CCNAME', os.environ)
self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME'])
def test_after_init_in_default_ccache_original_ccache_should_be_restored(
self, store_cred, acquire_cred_with_password, Credentials):
type(Credentials.return_value).lifetime = PropertyMock(
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
with krbContext(using_keytab=False,
principal='cqi',
password='security'):
self.assertNotIn('KRB5CCNAME', os.environ)
self.assertIn('KRB5CCNAME', os.environ)
self.assertEqual('/tmp/my_cc', os.environ['KRB5CCNAME'])
def test_init_in_default_ccache_with_given_keytab(self,
exists,
Credentials):
type(Credentials.return_value).lifetime = PropertyMock(
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
keytab = '/etc/app/app.keytab'
context = krbContext(using_keytab=True,
principal=self.service_principal,
keytab_file=keytab)
context.init_with_keytab()
Credentials.assert_has_calls([
call(usage='initiate', name=self.princ_name,
store={'client_keytab': keytab}),
call(usage='initiate', name=self.princ_name,
store={'ccache': self.tmp_ccache, 'client_keytab': keytab}),
call().store(usage='initiate', store=None,
set_default=True, overwrite=True),
])
Credentials.return_value.store.assert_called_once_with(
def test_init_with_given_keytab_and_ccache(self, exists, Credentials):
type(Credentials.return_value).lifetime = PropertyMock(
side_effect=gssapi.exceptions.ExpiredCredentialsError(1, 1))
keytab = '/etc/app/app.keytab'
ccache = '/tmp/mycc'
context = krbContext(using_keytab=True,
principal=self.service_principal,
keytab_file=keytab,
ccache_file=ccache)
context.init_with_keytab()
Credentials.assert_has_calls([
call(usage='initiate', name=self.princ_name,
store={'client_keytab': keytab, 'ccache': ccache}),
call(usage='initiate', name=self.princ_name,
store={'client_keytab': keytab, 'ccache': self.tmp_ccache}),
])
Credentials.return_value.store.assert_called_once_with(
application Kerberos authentication context, keytab has to be used.
:raises IOError: when trying to prompt to input password from command
line but no attry is available.
"""
creds_opts = {
'usage': 'initiate',
'name': self._cleaned_options['principal'],
}
if self._cleaned_options['ccache'] != DEFAULT_CCACHE:
creds_opts['store'] = {'ccache': self._cleaned_options['ccache']}
cred = gssapi.creds.Credentials(**creds_opts)
try:
cred.lifetime
except gssapi.exceptions.ExpiredCredentialsError:
password = self._cleaned_options['password']
if not password:
if not sys.stdin.isatty():
raise IOError(
'krbContext is not running from a terminal. So, you '
'need to run kinit with your principal manually before'
' anything goes.')
# If there is no password specified via API call, prompt to
# enter one in order to continue to get credential. BUT, in
# some cases, blocking program and waiting for input of
# password is really bad, which may be only suitable for some
# simple use cases, for example, writing some scripts to test
# something that need Kerberos authentication. Anyway, whether
# it is really to enter a password from command line, it
:parameters:
name
gssapi.Name object specifying principal or None for the default
ccache_name
string specifying Kerberos credentials cache name or None for the
default
:returns:
gssapi.Credentials object or None if valid credentials weren't found
'''
try:
creds = get_credentials(name=name, ccache_name=ccache_name)
if creds.lifetime > 0:
return creds
return None
except gssapi.exceptions.ExpiredCredentialsError:
return None
except gssapi.exceptions.GSSError:
return None