Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ctx = gssapi.SecurityContext(name=target_name, mech=gssapi.MechType.kerberos, creds=creds)
in_token = None
try:
while True:
out_token = ctx.step(in_token)
if out_token is None:
out_token = ''
result = send_sasl_negotiation(connection, controls, out_token)
in_token = result['saslCreds']
try:
# This raised an exception in gssapi<1.1.2 if the context was
# incomplete, but was fixed in
# https://github.com/pythongssapi/python-gssapi/pull/70
if ctx.complete:
break
except gssapi.exceptions.MissingContextError:
pass
unwrapped_token = ctx.unwrap(in_token)
if len(unwrapped_token.message) != 4:
raise LDAPCommunicationError("Incorrect response from server")
server_security_layers = unwrapped_token.message[0]
if not isinstance(server_security_layers, int):
server_security_layers = ord(server_security_layers)
if server_security_layers in (0, NO_SECURITY_LAYER):
if unwrapped_token.message[1:] != '\x00\x00\x00':
raise LDAPCommunicationError("Server max buffer size must be 0 if no security layer")
if not (server_security_layers & NO_SECURITY_LAYER):
raise LDAPCommunicationError("Server requires a security layer, but this is not implemented")
client_security_layers = bytearray([NO_SECURITY_LAYER, 0, 0, 0])
target_name = gssapi.Name('ldap@' + connection.server.host, gssapi.NameType.hostbased_service)
ctx = gssapi.SecurityContext(name=target_name, mech=gssapi.MechType.kerberos)
in_token = None
try:
while True:
out_token = ctx.step(in_token)
if out_token is None:
out_token = ''
result = send_sasl_negotiation(connection, controls, out_token)
in_token = result['saslCreds']
try:
# noinspection PyStatementEffect
ctx.complete # This raises an exception if we haven't completed connecting.
break
except gssapi.exceptions.MissingContextError:
pass
unwrapped_token = ctx.unwrap(in_token)
if len(unwrapped_token.message) != 4:
raise LDAPCommunicationError("Incorrect response from server")
server_security_layers = unwrapped_token.message[0]
if not isinstance(server_security_layers, int):
server_security_layers = ord(server_security_layers)
if server_security_layers in (0, NO_SECURITY_LAYER):
if unwrapped_token.message[1:] != '\x00\x00\x00':
raise LDAPCommunicationError("Server max buffer size must be 0 if no security layer")
if not (server_security_layers & NO_SECURITY_LAYER):
raise LDAPCommunicationError("Server requires a security layer, but this is not implemented")
client_security_layers = bytearray([NO_SECURITY_LAYER, 0, 0, 0])
old_principal = getattr(context, "principal", None)
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {"ccache": ccache_file, "client_keytab": keytab}
gssapi.Credentials(name=name, usage="initiate", store=store)
# Finalize API when TGT obtained using host keytab exists
if not api.isdone("finalize"):
api.finalize()
# Now we have a TGT, connect to IPA
try:
if api.Backend.rpcclient.isconnected():
api.Backend.rpcclient.disconnect()
api.Backend.rpcclient.connect()
yield
except gssapi.exceptions.GSSError as e:
raise Exception(
"Unable to bind to IPA server. Error initializing "
"principal %s in %s: %s" % (principal, keytab, str(e))
)
finally:
if api.Backend.rpcclient.isconnected():
api.Backend.rpcclient.disconnect()
setattr(context, "principal", old_principal)
used as well.
For a security context of the `accept` usage, the `creds` and
`channel_bindings` arguments may optionally be used.
"""
# NB(directxman12): _last_err must be set first
self._last_err = None
# determine the usage ('initiate' vs 'accept')
if base is None and token is None:
# this will be a new context
if usage is not None:
if usage not in ('initiate', 'accept'):
msg = "Usage must be either 'initiate' or 'accept'"
raise excs.UnknownUsageError(msg, obj="security context")
self.usage = usage
elif creds is not None and creds.usage != 'both':
self.usage = creds.usage
elif name is not None:
# if we pass a name, assume the usage is 'initiate'
self.usage = 'initiate'
else:
# if we don't pass a name, assume the usage is 'accept'
self.usage = 'accept'
# check for appropriate arguments
if self.usage == 'initiate':
# takes: creds?, target_name, mech?, flags?,
# channel_bindings?
if name is None:
principal, keytab)
logger.debug("using ccache %s", ccache_name)
for attempt in range(1, attempts + 1):
old_config = os.environ.get('KRB5_CONFIG')
if config is not None:
os.environ['KRB5_CONFIG'] = config
else:
os.environ.pop('KRB5_CONFIG', None)
try:
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {'ccache': ccache_name,
'client_keytab': keytab}
cred = gssapi.Credentials(name=name, store=store, usage='initiate')
logger.debug("Attempt %d/%d: success", attempt, attempts)
return cred
except gssapi.exceptions.GSSError as e:
if e.min_code not in errors_to_retry: # pylint: disable=no-member
raise
logger.debug("Attempt %d/%d: failed: %s", attempt, attempts, e)
if attempt == attempts:
logger.debug("Maximum number of attempts (%d) reached",
attempts)
raise
logger.debug("Waiting 5 seconds before next retry")
time.sleep(5)
finally:
if old_config is not None:
os.environ['KRB5_CONFIG'] = old_config
else:
os.environ.pop('KRB5_CONFIG', None)
def ldap_connect(self):
password = self.options.password
if not password:
try:
api.Backend.ldap2.connect(ccache=os.environ.get('KRB5CCNAME'))
except (gssapi.exceptions.GSSError, errors.ACIError):
pass
else:
return
password = installutils.read_password(
"Directory Manager", confirm=False, validate=False)
if password is None:
raise admintool.ScriptError(
"Directory Manager password required")
api.Backend.ldap2.connect(bind_pw=password)