Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
gssapi.C_DELEG_FLAG,
)
else:
gss_flags = (
gssapi.C_PROT_READY_FLAG,
gssapi.C_INTEG_FLAG,
gssapi.C_DELEG_FLAG,
)
# Initialize a GSS-API context.
ctx = gssapi.Context()
ctx.flags = gss_flags
krb5_oid = gssapi.OID.mech_from_string(self.krb5_mech)
target_name = gssapi.Name(
"host@" + self.targ_name, gssapi.C_NT_HOSTBASED_SERVICE
)
gss_ctxt = gssapi.InitContext(
peer_name=target_name, mech_type=krb5_oid, req_flags=ctx.flags
)
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.established
self.assertEquals(False, gss_ctxt_status)
# Accept a GSS-API context.
gss_srv_ctxt = gssapi.AcceptContext()
s_token = gss_srv_ctxt.step(c_token)
gss_ctxt_status = gss_srv_ctxt.established
self.assertNotEquals(None, s_token)
self.assertEquals(True, gss_ctxt_status)
# Establish the client context
c_token = gss_ctxt.step(s_token)
self.assertEquals(None, c_token)
else:
"host@" + self._gss_host, gssapi.C_NT_HOSTBASED_SERVICE
)
ctx = gssapi.Context()
ctx.flags = self._gss_flags
if desired_mech is None:
krb5_mech = gssapi.OID.mech_from_string(self._krb5_mech)
else:
mech, __ = decoder.decode(desired_mech)
if mech.__str__() != self._krb5_mech:
raise SSHException("Unsupported mechanism OID.")
else:
krb5_mech = gssapi.OID.mech_from_string(self._krb5_mech)
token = None
try:
if recv_token is None:
self._gss_ctxt = gssapi.InitContext(
peer_name=targ_name,
mech_type=krb5_mech,
req_flags=ctx.flags,
)
token = self._gss_ctxt.step(token)
else:
token = self._gss_ctxt.step(recv_token)
except gssapi.GSSException:
message = "{} Target: {}".format(sys.exc_info()[1], self._gss_host)
raise gssapi.GSSException(message)
self._gss_ctxt_status = self._gss_ctxt.established
return token
principal
The service principal
host
Host url where we would like to authenticate
domain
Kerberos user domain
'''
if not HAS_GSSAPI:
raise ImportError('The gssapi library is not imported.')
service = '{0}/{1}@{2}'.format(principal, host, domain)
log.debug('Retrieving gsspi token for service {0}'.format(service))
service_name = gssapi.Name(service, gssapi.C_NT_USER_NAME)
ctx = gssapi.InitContext(service_name)
in_token = None
while not ctx.established:
out_token = ctx.step(in_token)
if out_token:
if six.PY2:
return base64.b64encode(out_token)
return base64.b64encode(salt.utils.stringutils.to_bytes(out_token))
if ctx.established:
break
if not in_token:
raise salt.exceptions.CommandExecutionError(
'Can\'t receive token, no response from server')
raise salt.exceptions.CommandExecutionError(
'Context established, but didn\'t receive token')
targ_name = gssapi.Name("host@" + self._gss_host,
gssapi.C_NT_HOSTBASED_SERVICE)
ctx = gssapi.Context()
ctx.flags = self._gss_flags
if desired_mech is None:
krb5_mech = gssapi.OID.mech_from_string(self._krb5_mech)
else:
mech, __ = decoder.decode(desired_mech)
if mech.__str__() != self._krb5_mech:
raise SSHException("Unsupported mechanism OID.")
else:
krb5_mech = gssapi.OID.mech_from_string(self._krb5_mech)
token = None
try:
if recv_token is None:
self._gss_ctxt = gssapi.InitContext(peer_name=targ_name,
mech_type=krb5_mech,
req_flags=ctx.flags)
token = self._gss_ctxt.step(token)
else:
token = self._gss_ctxt.step(recv_token)
except gssapi.GSSException:
raise gssapi.GSSException("{0} Target: {1}".format(sys.exc_info()[1],
self._gss_host))
self._gss_ctxt_status = self._gss_ctxt.established
return token