Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def kinit(self, principal, password):
# Create credentials cache in temporary directory
self.cache_dir = tempfile.mkdtemp()
ccache = 'FILE:{}/ccache'.format(self.cache_dir)
os.environ['KRB5CCNAME'] = ccache
store = {b'ccache': ccache.encode('UTF-8')}
# Acquire new credentials
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
acquire_credentials = gssapi.raw.acquire_cred_with_password(name, password.encode('ascii'))
self.credentials = acquire_credentials.creds
# Store credentials in the cache
gssapi.raw.store_cred_into(store, self.credentials, usage='initiate', overwrite=True)
def kinit(self, principal, password):
# Create credentials cache in temporary directory
self.cache_dir = tempfile.mkdtemp()
ccache = 'FILE:{}/ccache'.format(self.cache_dir)
os.environ['KRB5CCNAME'] = ccache
store = {b'ccache': ccache.encode('UTF-8')}
# Acquire new credentials
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
acquire_credentials = gssapi.raw.acquire_cred_with_password(name, password.encode('ascii'))
self.credentials = acquire_credentials.creds
# Store credentials in the cache
gssapi.raw.store_cred_into(store, self.credentials, usage='initiate', overwrite=True)
gssapi.C_PROT_READY_FLAG,
gssapi.C_INTEG_FLAG,
gssapi.C_MUTUAL_FLAG,
gssapi.C_DELEG_FLAG,
)
else:
gss_flags = (
gssapi.C_PROT_READY_FLAG,
gssapi.C_INTEG_FLAG,
gssapi.C_DELEG_FLAG,
)
# Initialize a GSS-API context.
ctx = gssapi.Context()
ctx.flags = gss_flags
krb5_oid = gssapi.OID.mech_from_string(self.krb5_mech)
target_name = gssapi.Name(
"host@" + self.targ_name, gssapi.C_NT_HOSTBASED_SERVICE
)
gss_ctxt = gssapi.InitContext(
peer_name=target_name, mech_type=krb5_oid, req_flags=ctx.flags
)
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.established
self.assertEquals(False, gss_ctxt_status)
# Accept a GSS-API context.
gss_srv_ctxt = gssapi.AcceptContext()
s_token = gss_srv_ctxt.step(c_token)
gss_ctxt_status = gss_srv_ctxt.established
self.assertNotEquals(None, s_token)
self.assertEquals(True, gss_ctxt_status)
# Establish the client context
if self.server_mode:
gss_flags = (
gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.delegate_to_peer,
)
else:
gss_flags = (
gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.delegate_to_peer,
)
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
target_name = gssapi.Name(
"host@" + self.targ_name,
name_type=gssapi.NameType.hostbased_service,
)
gss_ctxt = gssapi.SecurityContext(
name=target_name,
flags=gss_flags,
mech=krb5_oid,
usage="initiate",
)
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.complete
self.assertEquals(False, gss_ctxt_status)
# Accept a GSS-API context.
gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
s_token = gss_srv_ctxt.step(c_token)
def get_spnego_token(self):
"""
Obtain a SPNEGO token for the broker service and set the token to
the 'authorization' metadata header.
"""
service_name_string = '{}/{}@{}'.format(BROKER_USER, self.host, REALM)
service_name = gssapi.Name(service_name_string, gssapi.NameType.kerberos_principal)
spnego_mech_oid = gssapi.raw.OID.from_int_seq('1.3.6.1.5.5.2')
context = gssapi.SecurityContext(
name=service_name, mech=spnego_mech_oid, usage='initiate', creds=self.credentials)
response = context.step()
spnego_token = b64encode(response).decode()
return spnego_token
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.delegate_to_peer,
)
else:
gss_flags = (
gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.delegate_to_peer,
)
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
target_name = gssapi.Name(
"host@" + self.targ_name,
name_type=gssapi.NameType.hostbased_service,
)
gss_ctxt = gssapi.SecurityContext(
name=target_name,
flags=gss_flags,
mech=krb5_oid,
usage="initiate",
)
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.complete
self.assertEquals(False, gss_ctxt_status)
# Accept a GSS-API context.
gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
s_token = gss_srv_ctxt.step(c_token)
gss_ctxt_status = gss_srv_ctxt.complete
self.assertNotEquals(None, s_token)
self.assertEquals(True, gss_ctxt_status)
# Establish the client context
def test_all_defaults(self, get_login):
get_login.return_value = 'cqi'
context = krbContext()
expected_princ = gssapi.names.Name(get_login.return_value,
gssapi.names.NameType.user)
self.assertEqual(expected_princ,
context._cleaned_options['principal'])
self.assertEqual(kctx.DEFAULT_CCACHE,
context._cleaned_options['ccache'])
self.assertFalse(context._cleaned_options['using_keytab'])
def test_all_defaults(self):
context = krbContext(using_keytab=True,
principal='HTTP/hostname@EXAMPLE.COM')
self.assertTrue(context._cleaned_options['using_keytab'])
expected_princ = gssapi.names.Name(
'HTTP/hostname@EXAMPLE.COM',
gssapi.names.NameType.kerberos_principal)
self.assertEqual(expected_princ, context._cleaned_options['principal'])
self.assertEqual(kctx.DEFAULT_CCACHE,
context._cleaned_options['ccache'])
self.assertEqual(kctx.DEFAULT_KEYTAB,
context._cleaned_options['keytab'])
def setUp(self):
self.principal = 'cqi'
self.princ_name = gssapi.names.Name(self.principal,
gssapi.names.NameType.user)
gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.delegate_to_peer,
)
else:
gss_flags = (
gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.delegate_to_peer,
)
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
target_name = gssapi.Name(
"host@" + self.targ_name,
name_type=gssapi.NameType.hostbased_service,
)
gss_ctxt = gssapi.SecurityContext(
name=target_name,
flags=gss_flags,
mech=krb5_oid,
usage="initiate",
)
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.complete
self.assertEquals(False, gss_ctxt_status)
# Accept a GSS-API context.
gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
s_token = gss_srv_ctxt.step(c_token)
gss_ctxt_status = gss_srv_ctxt.complete
self.assertNotEquals(None, s_token)