Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_spnego_token(self):
"""
Obtain a SPNEGO token for the broker service and set the token to
the 'authorization' metadata header.
"""
service_name_string = '{}/{}@{}'.format(BROKER_USER, self.host, REALM)
service_name = gssapi.Name(service_name_string, gssapi.NameType.kerberos_principal)
spnego_mech_oid = gssapi.raw.OID.from_int_seq('1.3.6.1.5.5.2')
context = gssapi.SecurityContext(
name=service_name, mech=spnego_mech_oid, usage='initiate', creds=self.credentials)
response = context.step()
spnego_token = b64encode(response).decode()
return spnego_token
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.delegate_to_peer,
)
else:
gss_flags = (
gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.delegate_to_peer,
)
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
target_name = gssapi.Name(
"host@" + self.targ_name,
name_type=gssapi.NameType.hostbased_service,
)
gss_ctxt = gssapi.SecurityContext(
name=target_name,
flags=gss_flags,
mech=krb5_oid,
usage="initiate",
)
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.complete
self.assertEquals(False, gss_ctxt_status)
# Accept a GSS-API context.
gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
s_token = gss_srv_ctxt.step(c_token)
gss_ctxt_status = gss_srv_ctxt.complete
self.assertNotEquals(None, s_token)
self.assertEquals(True, gss_ctxt_status)
# Establish the client context
session_cookie = getattr(context, 'session_cookie', None)
if session_cookie:
self._extra_headers.append(('Cookie', session_cookie))
return
# Set the remote host principal
host = self._get_host()
service = self.service + "@" + host.split(':')[0]
try:
creds = None
if self.ccache:
creds = gssapi.Credentials(usage='initiate',
store={'ccache': self.ccache})
name = gssapi.Name(service, gssapi.NameType.hostbased_service)
self._sec_context = gssapi.SecurityContext(creds=creds, name=name,
flags=self.flags)
response = self._sec_context.step()
except gssapi.exceptions.GSSError as e:
self._handle_exception(e, service=service)
self._set_auth_header(response)
def get(self, path):
""" Perform a GET request with GSSAPI authentication """
# Generate token
service_name = gssapi.Name('HTTP@{0}'.format(self.url.netloc),
gssapi.NameType.hostbased_service)
ctx = gssapi.SecurityContext(usage="initiate", name=service_name)
data = b64encode(ctx.step()).decode()
# Make the connection
connection = http.client.HTTPSConnection(self.url.netloc, 443)
log.debug("GET {0}".format(path))
connection.putrequest("GET", path)
connection.putheader("Authorization", "Negotiate {0}".format(data))
connection.putheader("Referer", self.url_string)
connection.endheaders()
# Perform the request, convert response into lines
response = connection.getresponse()
if response.status != 200:
raise ReportError(
"Failed to fetch tickets: {0}".format(response.status))
lines = response.read().decode("utf8").strip().split("\n")[1:]
def sasl_gssapi(connection, controls):
"""
Performs a bind using the Kerberos v5 ("GSSAPI") SASL mechanism
from RFC 4752. Does not support any security layers, only authentication!
"""
target_name = gssapi.Name('ldap@' + connection.server.host, gssapi.NameType.hostbased_service)
ctx = gssapi.SecurityContext(name=target_name, mech=gssapi.MechType.kerberos)
in_token = None
try:
while True:
out_token = ctx.step(in_token)
if out_token is None:
out_token = ''
result = send_sasl_negotiation(connection, controls, out_token)
in_token = result['saslCreds']
try:
# noinspection PyStatementEffect
ctx.complete # This raises an exception if we haven't completed connecting.
break
except gssapi.exceptions.MissingContextError:
pass
unwrapped_token = ctx.unwrap(in_token)
server_address = (self.host, self.port)
try:
self.sock.settimeout(self.connect_timeout)
self.sock.connect(server_address)
except socket.error:
_LOGGER.debug('Connection timeout: %s:%s', self.host, self.port)
return False
self.sock.settimeout(None)
self.stream = self.sock.makefile(mode='rwb')
service_name = gssapi.Name(
self.service_name,
name_type=gssapi.NameType.hostbased_service
)
self.ctx = gssapi.SecurityContext(name=service_name, usage='initiate')
in_token = None
while not self.ctx.complete:
out_token = self.ctx.step(in_token)
if out_token:
out_encoded = base64.standard_b64encode(out_token)
self._write_line(out_encoded)
if self.ctx.complete:
break
in_encoded = self._read_line()
in_token = base64.standard_b64decode(in_encoded)
if not in_token:
raise GSSError('No response from server.')
_LOGGER.debug('Successfully authenticated.')
def initial_step(self, request, response=None):
if self.context is None:
store = {'ccache': self.ccache_name}
creds = gssapi.Credentials(usage='initiate', store=store)
name = gssapi.Name('HTTP@{0}'.format(self.target_host),
name_type=gssapi.NameType.hostbased_service)
self.context = gssapi.SecurityContext(creds=creds, name=name,
usage='initiate')
in_token = self._get_negotiate_token(response)
out_token = self.context.step(in_token)
self._set_authz_header(request, out_token)
def _auth_header(self):
if self.creds.lifetime < 300:
self.creds = self._init_creds()
ctx = gssapi.SecurityContext(
name=self.service_name,
creds=self.creds
)
authtok = ctx.step()
return {'Authorization': 'Negotiate %s' % b64encode(
authtok).decode('ascii')}
# error when trying to access the existing cache, get our own
# credentials with the password specified
b_password = to_bytes(password)
cred = gssapi.raw.acquire_cred_with_password(username, b_password,
usage='initiate',
mechs=[mech])
cred = cred.creds
flags = gssapi.RequirementFlag.mutual_authentication | \
gssapi.RequirementFlag.out_of_sequence_detection
if delegate:
flags |= gssapi.RequirementFlag.delegate_to_peer
if wrap_required:
flags |= gssapi.RequirementFlag.confidentiality
context = gssapi.SecurityContext(name=server_name,
creds=cred,
usage='initiate',
mech=mech,
flags=flags,
channel_bindings=channel_bindings)
return context
hostname = socket.gethostbyaddr(connection.socket.getpeername()[0])[0]
target_name = gssapi.Name('ldap@' + hostname, gssapi.NameType.hostbased_service)
else:
target_name = gssapi.Name('ldap@' + connection.sasl_credentials[0], gssapi.NameType.hostbased_service)
if len(connection.sasl_credentials) >= 2 and connection.sasl_credentials[1]:
authz_id = connection.sasl_credentials[1].encode("utf-8")
if len(connection.sasl_credentials) >= 3 and connection.sasl_credentials[2]:
raw_creds = connection.sasl_credentials[2]
if target_name is None:
target_name = gssapi.Name('ldap@' + connection.server.host, gssapi.NameType.hostbased_service)
if raw_creds is not None:
creds = gssapi.Credentials(base=raw_creds, usage='initiate', store=connection.cred_store)
else:
creds = gssapi.Credentials(name=gssapi.Name(connection.user), usage='initiate', store=connection.cred_store) if connection.user else None
ctx = gssapi.SecurityContext(name=target_name, mech=gssapi.MechType.kerberos, creds=creds)
in_token = None
try:
while True:
out_token = ctx.step(in_token)
if out_token is None:
out_token = ''
result = send_sasl_negotiation(connection, controls, out_token)
in_token = result['saslCreds']
try:
# This raised an exception in gssapi<1.1.2 if the context was
# incomplete, but was fixed in
# https://github.com/pythongssapi/python-gssapi/pull/70
if ctx.complete:
break
except gssapi.exceptions.MissingContextError:
pass