Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_gssapi_get_sec_context_auto_implicit(self):
gss = pytest.importorskip("gssapi")
name_type = gss.NameType.user
mech = gss.OID.from_int_seq(GSSAPIContext._AUTH_PROVIDERS['auto'])
spn = "http@hostname"
username = None
password = None
delegate = False
wrap_required = False
cbt = None
with pytest.raises(ValueError) as err:
GSSAPIContext._get_security_context(name_type, mech, spn, username,
password, delegate,
wrap_required, cbt)
assert str(err.value) == "Can only use implicit credentials with " \
"kerberos authentication"
def test_gssapi_step(self):
class MockSecContext(object):
def __init__(self):
self.complete = False
self._first = True
def step(self, in_token=None):
if self._first:
self._first = False
return b"token"
else:
self.complete = True
return in_token
context = GSSAPIContext(None, None, "auto", None, "hostname", "http",
True, True)
context._context = MockSecContext()
assert context.complete is False
gen = context.step()
actual = next(gen)
assert actual == b"token"
assert context.complete is False
actual2 = gen.send(b"new token")
assert actual2 == b"new token"
assert context.complete
def test_get_auth_gssapi_auto_successful(self, monkeypatch):
pytest.importorskip("gssapi")
def _step(self, token=None):
yield b"token"
mock_set_sec = MagicMock()
mock_init = MagicMock()
monkeypatch.setattr('gssapi.raw.set_sec_context_option', mock_set_sec)
monkeypatch.setattr(GSSAPIContext, "init_context", mock_init)
monkeypatch.setattr(GSSAPIContext, "step", _step)
context, gen, token = get_auth_context("", "", "auto", None, "host",
"service", False, False)
assert isinstance(context, GSSAPIContext)
assert context.auth_provider == "1.3.6.1.5.5.2"
assert mock_init.call_count == 1
assert isinstance(gen, types.GeneratorType)
assert token == b"token"
def test_gssapi_init_context_kerberos(self, monkeypatch):
gss = pytest.importorskip("gssapi")
mock_con = MagicMock()
monkeypatch.setattr(GSSAPIContext, "_get_security_context", mock_con)
context = GSSAPIContext("user", "pass", "kerberos", None, "hostname",
"http", True, True)
context.init_context()
name, mech, spn, user, password, delegate, wrap, cbt = \
mock_con.call_args[0]
assert name == gss.NameType.kerberos_principal
assert mech == \
gss.OID.from_int_seq(GSSAPIContext._AUTH_PROVIDERS['kerberos'])
assert spn == "http@hostname"
assert user == "user"
assert password == "pass"
assert delegate is True
assert wrap is True
assert cbt is None
def test_gssapi_init_context_ntlm(self, monkeypatch):
gss = pytest.importorskip("gssapi")
mock_con = MagicMock()
monkeypatch.setattr(GSSAPIContext, "_get_security_context", mock_con)
context = GSSAPIContext("user", "pass", "ntlm", None, "hostname",
"http", True, True)
context.init_context()
name, mech, spn, user, password, delegate, wrap, cbt = \
mock_con.call_args[0]
assert name == gss.NameType.user
assert mech == \
gss.OID.from_int_seq(GSSAPIContext._AUTH_PROVIDERS['ntlm'])
assert spn == "http@hostname"
assert user == "user"
assert password == "pass"
assert delegate is True
assert wrap is True
assert cbt is None
def test_gssapi_init_context_kerberos(self, monkeypatch):
gss = pytest.importorskip("gssapi")
mock_con = MagicMock()
monkeypatch.setattr(GSSAPIContext, "_get_security_context", mock_con)
context = GSSAPIContext("user", "pass", "kerberos", None, "hostname",
"http", True, True)
context.init_context()
name, mech, spn, user, password, delegate, wrap, cbt = \
mock_con.call_args[0]
assert name == gss.NameType.kerberos_principal
assert mech == \
gss.OID.from_int_seq(GSSAPIContext._AUTH_PROVIDERS['kerberos'])
assert spn == "http@hostname"
assert user == "user"
assert password == "pass"
assert delegate is True
assert wrap is True
assert cbt is None
if auth_provider not in ["auto", "kerberos", "ntlm"]:
raise ValueError("Invalid auth_provider specified %s, must be "
"auto, kerberos, or ntlm" % auth_provider)
context_gen = None
out_token = None
if HAS_SSPI:
# always use SSPI when available
log.debug("SSPI is available and will be used as the auth backend")
context = SSPIContext(username, password, auth_provider, cbt_app_data,
hostname, service, delegate)
elif HAS_GSSAPI and auth_provider != "ntlm":
log.debug("GSSAPI is available, determine if it can handle the auth "
"provider specified or whether the NTLM fallback is used")
mechs_available = GSSAPIContext.get_available_mechs(wrap_required)
if auth_provider in mechs_available:
log.debug("GSSAPI with mech %s is being used as the auth backend"
% auth_provider)
context = GSSAPIContext(username, password, auth_provider,
cbt_app_data, hostname, service, delegate,
wrap_required)
elif auth_provider == "kerberos":
raise ValueError("The auth_provider specified 'kerberos' is not "
"available as message encryption is required but "
"is not available on the current system. Either "
"disable encryption, use https or specify "
"auto/ntlm")
elif auth_provider == "auto" and "kerberos" in mechs_available:
log.debug("GSSAPI is available but SPNEGO/NTLM is not natively "
"supported, try to use Kerberos explicitly and fallback "
def __init__(self, username, password, auth_provider, cbt_app_data,
hostname, service, delegate, wrap_required):
super(GSSAPIContext, self).__init__(password, auth_provider,
cbt_app_data)
self._username = username
self._target_spn = "%s@%s" % (service.lower(), hostname)
self._delegate = delegate
self.wrap_required = wrap_required
def init_context(self):
if self.auth_provider != self._AUTH_PROVIDERS['kerberos']:
name_type = gssapi.NameType.user
else:
name_type = gssapi.NameType.kerberos_principal
mech = gssapi.OID.from_int_seq(self.auth_provider)
cbt_app_data = None
if self.cbt_app_data is not None:
cbt_app_data = ChannelBindings(application_data=self.cbt_app_data)
log.debug("GSSAPI: Acquiring security context for user %s with mech "
"%s" % (self.username, self.auth_provider))
self._context = GSSAPIContext._get_security_context(
name_type, mech, self._target_spn, self.username, self.password,
self._delegate, self.wrap_required, cbt_app_data
)