Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
gssapi.C_PROT_READY_FLAG,
gssapi.C_INTEG_FLAG,
gssapi.C_MUTUAL_FLAG,
gssapi.C_DELEG_FLAG,
)
else:
gss_flags = (
gssapi.C_PROT_READY_FLAG,
gssapi.C_INTEG_FLAG,
gssapi.C_DELEG_FLAG,
)
# Initialize a GSS-API context.
ctx = gssapi.Context()
ctx.flags = gss_flags
krb5_oid = gssapi.OID.mech_from_string(self.krb5_mech)
target_name = gssapi.Name(
"host@" + self.targ_name, gssapi.C_NT_HOSTBASED_SERVICE
)
gss_ctxt = gssapi.InitContext(
peer_name=target_name, mech_type=krb5_oid, req_flags=ctx.flags
)
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.established
self.assertEquals(False, gss_ctxt_status)
# Accept a GSS-API context.
gss_srv_ctxt = gssapi.AcceptContext()
s_token = gss_srv_ctxt.step(c_token)
gss_ctxt_status = gss_srv_ctxt.established
self.assertNotEquals(None, s_token)
self.assertEquals(True, gss_ctxt_status)
# Establish the client context
if self.server_mode:
gss_flags = (
gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.delegate_to_peer,
)
else:
gss_flags = (
gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.delegate_to_peer,
)
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
target_name = gssapi.Name(
"host@" + self.targ_name,
name_type=gssapi.NameType.hostbased_service,
)
gss_ctxt = gssapi.SecurityContext(
name=target_name,
flags=gss_flags,
mech=krb5_oid,
usage="initiate",
)
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.complete
self.assertEquals(False, gss_ctxt_status)
# Accept a GSS-API context.
gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
s_token = gss_srv_ctxt.step(c_token)
Get the gssapi token for Kerberos connection
principal
The service principal
host
Host url where we would like to authenticate
domain
Kerberos user domain
'''
if not HAS_GSSAPI:
raise ImportError('The gssapi library is not imported.')
service = '{0}/{1}@{2}'.format(principal, host, domain)
log.debug('Retrieving gsspi token for service {0}'.format(service))
service_name = gssapi.Name(service, gssapi.C_NT_USER_NAME)
ctx = gssapi.InitContext(service_name)
in_token = None
while not ctx.established:
out_token = ctx.step(in_token)
if out_token:
if six.PY2:
return base64.b64encode(out_token)
return base64.b64encode(salt.utils.stringutils.to_bytes(out_token))
if ctx.established:
break
if not in_token:
raise salt.exceptions.CommandExecutionError(
'Can\'t receive token, no response from server')
raise salt.exceptions.CommandExecutionError(
'Context established, but didn\'t receive token')
def _get_security_context(name_type, mech, spn, username, password):
user = gssapi.Name(base=username,
name_type=name_type)
server_name = gssapi.Name(spn,
name_type=gssapi.NameType.hostbased_service)
# acquire_cred_with_password is an expensive operation with Kerberos,
# we will first attempt to just retrieve from the local credential
# cache and if that fails we then acquire with the password. This is
# only relevant to the Kerberos mech as NTLM and SPNEGO require us to
# acquire with the password
acquire_with_pass = True
kerb_oid = GSSAPIContext._AUTH_MECHANISMS['kerberos']
kerb_mech = gssapi.OID.from_int_seq(kerb_oid)
if mech == kerb_mech:
try:
cred = gssapi.Credentials(name=user, usage='initiate',
mechs=[mech])
# we successfully got the Kerberos credential from the cache
# and don't need to acquire with the password
def get(self, path):
""" Perform a GET request with GSSAPI authentication """
# Generate token
service_name = gssapi.Name('HTTP@{0}'.format(self.url.netloc),
gssapi.NameType.hostbased_service)
ctx = gssapi.SecurityContext(usage="initiate", name=service_name)
data = b64encode(ctx.step()).decode()
# Make the connection
connection = http.client.HTTPSConnection(self.url.netloc, 443)
log.debug("GET {0}".format(path))
connection.putrequest("GET", path)
connection.putheader("Authorization", "Negotiate {0}".format(data))
connection.putheader("Referer", self.url_string)
connection.endheaders()
# Perform the request, convert response into lines
response = connection.getresponse()
if response.status != 200:
raise ReportError(
def connect(self):
"""Connect and authenticate to the server."""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (self.host, self.port)
try:
self.sock.settimeout(self.connect_timeout)
self.sock.connect(server_address)
except socket.error:
_LOGGER.debug('Connection timeout: %s:%s', self.host, self.port)
return False
self.sock.settimeout(None)
self.stream = self.sock.makefile(mode='rwb')
service_name = gssapi.Name(
self.service_name,
name_type=gssapi.NameType.hostbased_service
)
self.ctx = gssapi.SecurityContext(name=service_name, usage='initiate')
in_token = None
while not self.ctx.complete:
out_token = self.ctx.step(in_token)
if out_token:
out_encoded = base64.standard_b64encode(out_token)
self._write_line(out_encoded)
if self.ctx.complete:
break
in_encoded = self._read_line()
in_token = base64.standard_b64decode(in_encoded)
def _get_security_context(name_type, mech, spn, username, password):
user = gssapi.Name(base=username,
name_type=name_type)
server_name = gssapi.Name(spn,
name_type=gssapi.NameType.hostbased_service)
# acquire_cred_with_password is an expensive operation with Kerberos,
# we will first attempt to just retrieve from the local credential
# cache and if that fails we then acquire with the password. This is
# only relevant to the Kerberos mech as NTLM and SPNEGO require us to
# acquire with the password
acquire_with_pass = True
kerb_oid = GSSAPIContext._AUTH_MECHANISMS['kerberos']
kerb_mech = gssapi.OID.from_int_seq(kerb_oid)
if mech == kerb_mech:
try:
cred = gssapi.Credentials(name=user, usage='initiate',
mechs=[mech])