Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_spnego_token(self):
"""
Obtain a SPNEGO token for the broker service and set the token to
the 'authorization' metadata header.
"""
service_name_string = '{}/{}@{}'.format(BROKER_USER, self.host, REALM)
service_name = gssapi.Name(service_name_string, gssapi.NameType.kerberos_principal)
spnego_mech_oid = gssapi.raw.OID.from_int_seq('1.3.6.1.5.5.2')
context = gssapi.SecurityContext(
name=service_name, mech=spnego_mech_oid, usage='initiate', creds=self.credentials)
response = context.step()
spnego_token = b64encode(response).decode()
return spnego_token
get_call = mock_get.mock_calls[calls_before]
# It should have prepared credentials with the details from config
mock_creds.assert_called_once_with(
name=gssapi.Name("someclient@EXAMPLE.COM", gssapi.NameType.kerberos_principal),
store={"client_keytab": "some-keytab", "ccache": "FILE:some-cache"},
usage="initiate",
)
# It should have prepared auth with those credentials and our configured
# server principal
mock_auth.assert_called_once_with(
creds=mock_creds.return_value,
target_name=gssapi.Name(
"SVC/hub.example.com@REALM.EXAMPLE.COM", gssapi.NameType.kerberos_principal
),
)
# It should have used the configured CA bundle when issuing the request
assert get_call[2]["verify"] == "/some/ca-bundle.pem"
transport = FakeTransport()
proxy = HubProxy(conf, transport=transport)
mock_get = requests_session.return_value.get
calls_before = len(mock_get.mock_calls)
with mock.patch("requests_gssapi.HTTPSPNEGOAuth") as mock_auth:
with mock.patch("gssapi.Credentials") as mock_creds:
# Force a login
proxy._login(force=True)
get_call = mock_get.mock_calls[calls_before]
# It should have prepared credentials with the details from config
mock_creds.assert_called_once_with(
name=gssapi.Name("someclient@EXAMPLE.COM", gssapi.NameType.kerberos_principal),
store={"client_keytab": "some-keytab", "ccache": "FILE:some-cache"},
usage="initiate",
)
# It should have prepared auth with those credentials and our configured
# server principal
mock_auth.assert_called_once_with(
creds=mock_creds.return_value,
target_name=gssapi.Name(
"SVC/hub.example.com@REALM.EXAMPLE.COM", gssapi.NameType.kerberos_principal
),
)
# It should have used the configured CA bundle when issuing the request
assert get_call[2]["verify"] == "/some/ca-bundle.pem"
def use_keytab(principal, keytab):
with private_ccache() as ccache_file:
try:
old_principal = getattr(context, 'principal', None)
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {'ccache': ccache_file,
'client_keytab': keytab}
gssapi.Credentials(name=name, usage='initiate', store=store)
conn = ldap2(api)
conn.connect(ccache=ccache_file,
autobind=ipaldap.AUTOBIND_DISABLED)
yield conn
conn.disconnect()
except gssapi.exceptions.GSSError as e:
raise Exception('Unable to bind to LDAP. Error initializing '
'principal %s in %s: %s' % (principal, keytab,
str(e)))
finally:
setattr(context, 'principal', old_principal)
import requests
import gssapi
import requests_gssapi
request_args = {}
# NOTE behavior difference from hub proxy overall:
# HubProxy by default DOES NOT verify https connections :(
# See the constructor. It could be repeated here by defaulting verify to False,
# but let's not do that, instead you must have an unbroken SSL setup to
# use this auth method.
if self._conf.get("CA_CERT"):
request_args["verify"] = self._conf["CA_CERT"]
server_name = self.get_server_principal(service=service, realm=realm)
server_name = gssapi.Name(server_name, gssapi.NameType.kerberos_principal)
auth_args = {
"target_name": server_name,
}
if principal is not None:
if keytab is None:
raise ImproperlyConfigured(
"Cannot specify a principal without a keytab"
)
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {"client_keytab": keytab}
if ccache is not None:
store["ccache"] = "FILE:" + ccache
auth_args["creds"] = gssapi.Credentials(
name=name, store=store, usage="initiate"
def use_api_as_principal(principal, keytab):
with ipautil.private_ccache() as ccache_file:
try:
old_principal = getattr(context, "principal", None)
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {"ccache": ccache_file, "client_keytab": keytab}
gssapi.Credentials(name=name, usage="initiate", store=store)
# Finalize API when TGT obtained using host keytab exists
if not api.isdone("finalize"):
api.finalize()
# Now we have a TGT, connect to IPA
try:
if api.Backend.rpcclient.isconnected():
api.Backend.rpcclient.disconnect()
api.Backend.rpcclient.connect()
yield
except gssapi.exceptions.GSSError as e:
raise Exception(
"Unable to bind to IPA server. Error initializing "
def init_context(self):
if self.auth_provider != self._AUTH_PROVIDERS['kerberos']:
name_type = gssapi.NameType.user
else:
name_type = gssapi.NameType.kerberos_principal
mech = gssapi.OID.from_int_seq(self.auth_provider)
cbt_app_data = None
if self.cbt_app_data is not None:
cbt_app_data = ChannelBindings(application_data=self.cbt_app_data)
log.debug("GSSAPI: Acquiring security context for user %s with mech "
"%s" % (self.username, self.auth_provider))
self._context = GSSAPIContext._get_security_context(
name_type, mech, self._target_spn, self.username, self.password,
self._delegate, self.wrap_required, cbt_app_data
)
def init_context(self):
if self.auth_mech != self._AUTH_MECHANISMS['kerberos']:
name_type = gssapi.NameType.user
else:
name_type = gssapi.NameType.kerberos_principal
mech = gssapi.OID.from_int_seq(self.auth_mech)
log.debug("GSSAPI: Acquiring security context for user %s with mech %s"
% (self.username, self.auth_mech))
self._context = self._get_security_context(name_type, mech,
self._target_spn,
self.username,
self.password)
The optional parameter 'attempts' specifies how many times the credential
initialization should be attempted in case of non-responsive KDC.
"""
errors_to_retry = {KRB5KDC_ERR_SVC_UNAVAILABLE,
KRB5_KDC_UNREACH}
logger.debug("Initializing principal %s using keytab %s",
principal, keytab)
logger.debug("using ccache %s", ccache_name)
for attempt in range(1, attempts + 1):
old_config = os.environ.get('KRB5_CONFIG')
if config is not None:
os.environ['KRB5_CONFIG'] = config
else:
os.environ.pop('KRB5_CONFIG', None)
try:
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {'ccache': ccache_name,
'client_keytab': keytab}
cred = gssapi.Credentials(name=name, store=store, usage='initiate')
logger.debug("Attempt %d/%d: success", attempt, attempts)
return cred
except gssapi.exceptions.GSSError as e:
if e.min_code not in errors_to_retry: # pylint: disable=no-member
raise
logger.debug("Attempt %d/%d: failed: %s", attempt, attempts, e)
if attempt == attempts:
logger.debug("Maximum number of attempts (%d) reached",
attempts)
raise
logger.debug("Waiting 5 seconds before next retry")
time.sleep(5)
finally: