Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wrap(self, data):
if self._context.mech == gssapi.OID.from_int_seq(self._AUTH_PROVIDERS['ntlm']):
# NTLM was used, either directly or through SPNEGO and gss-ntlmssp does not support wrap_iov, wrap works
# just fine in this scenario though.
enc_data = self._context.wrap(data, True).message
# NTLM headers are capped at the first 16 bytes of the encrypted payload.
return enc_data[:16], enc_data[16:]
else:
iov = IOV(IOVBufferType.header, data, IOVBufferType.padding,
std_layout=False)
wrap_iov(self._context, iov, confidential=True)
return iov[0].value, iov[1].value + (iov[2].value or b"")
def get_available_mechs(encryption_required=False):
available_mechs = ["kerberos"]
# while kerb auth might be available, if we require wrapping and the
# extension is not available then we can't use it
if encryption_required and not HAS_GSSAPI_ENCRYPTION:
available_mechs.pop(0)
ntlm_oid = GSSAPIContext._AUTH_PROVIDERS['ntlm']
ntlm_mech = gssapi.OID.from_int_seq(ntlm_oid)
# GSS_NTLMSSP_RESET_CRYPTO_OID_LENGTH
# github.com/simo5/gss-ntlmssp/blob/master/src/gssapi_ntlmssp.h#L68
reset_mech = gssapi.OID.from_int_seq("1.3.6.1.4.1.7165.655.1.3")
try:
# we don't actually care about the account used here so just use
# a random username and password
ntlm_context = GSSAPIContext._get_security_context(
gssapi.NameType.user,
ntlm_mech,
"http@server",
"username",
"password",
False,
encryption_required
)
ntlm_context.step()
gssapi.raw.set_sec_context_option(reset_mech, context=ntlm_context,
value=b"\x00" * 4)
def init_context(self):
if self.auth_provider != self._AUTH_PROVIDERS['kerberos']:
name_type = gssapi.NameType.user
else:
name_type = gssapi.NameType.kerberos_principal
mech = gssapi.OID.from_int_seq(self.auth_provider)
cbt_app_data = None
if self.cbt_app_data is not None:
cbt_app_data = ChannelBindings(application_data=self.cbt_app_data)
log.debug("GSSAPI: Acquiring security context for user %s with mech "
"%s" % (self.username, self.auth_provider))
self._context = GSSAPIContext._get_security_context(
name_type, mech, self._target_spn, self.username, self.password,
self._delegate, self.wrap_required, cbt_app_data
)
def _get_security_context(name_type, mech, spn, username, password,
delegate, wrap_required, channel_bindings=None):
if username is not None:
username = gssapi.Name(base=username, name_type=name_type)
server_name = gssapi.Name(spn,
name_type=gssapi.NameType.hostbased_service)
# first try and get the cred from the existing cache, if that fails
# then get a new ticket with the password (if specified). The cache
# can only be used for Kerberos, NTLM/SPNEGO must have acquire the
# cred with a pass
cred = None
kerb_oid = GSSAPIContext._AUTH_PROVIDERS['kerberos']
kerb_mech = gssapi.OID.from_int_seq(kerb_oid)
if mech == kerb_mech:
try:
cred = gssapi.Credentials(name=username, usage='initiate',
mechs=[mech])
# raises ExpiredCredentialsError if it has expired
cred.lifetime
except gssapi.raw.GSSError:
# we can't acquire the cred if no password was supplied
if password is None:
raise
cred = None
elif username is None or password is None:
raise ValueError("Can only use implicit credentials with kerberos "
"authentication")
if cred is None:
def _get_security_context(name_type, mech, spn, username, password):
user = gssapi.Name(base=username,
name_type=name_type)
server_name = gssapi.Name(spn,
name_type=gssapi.NameType.hostbased_service)
# acquire_cred_with_password is an expensive operation with Kerberos,
# we will first attempt to just retrieve from the local credential
# cache and if that fails we then acquire with the password. This is
# only relevant to the Kerberos mech as NTLM and SPNEGO require us to
# acquire with the password
acquire_with_pass = True
kerb_oid = GSSAPIContext._AUTH_MECHANISMS['kerberos']
kerb_mech = gssapi.OID.from_int_seq(kerb_oid)
if mech == kerb_mech:
try:
cred = gssapi.Credentials(name=user, usage='initiate',
mechs=[mech])
# we successfully got the Kerberos credential from the cache
# and don't need to acquire with the password
acquire_with_pass = False
except gssapi.exceptions.GSSError:
pass
if acquire_with_pass:
# error when trying to access the existing cache, get our own
# credentials with the password specified
b_password = password.encode('utf-8')
cred = acquire_cred_with_password(user, b_password,
usage='initiate',
def init_context(self):
if self.auth_mech != self._AUTH_MECHANISMS['kerberos']:
name_type = gssapi.NameType.user
else:
name_type = gssapi.NameType.kerberos_principal
mech = gssapi.OID.from_int_seq(self.auth_mech)
log.debug("GSSAPI: Acquiring security context for user %s with mech %s"
% (self.username, self.auth_mech))
self._context = self._get_security_context(name_type, mech,
self._target_spn,
self.username,
self.password)
care if SPNEGO, Kerberos and NTLM are available where NTLM is the
only wildcard that may not be available by default.
The only NTLM implementation that works properly is gss-ntlmssp and
part of this test is to verify the gss-ntlmssp OID
GSS_NTLMSSP_RESET_CRYPTO_OID_LENGTH is implemented which is required
for SPNEGO and NTLM to work properly.
:return: list - A list of supported mechs available in the installed
version of GSSAPI
"""
ntlm_oid = GSSAPIContext._AUTH_MECHANISMS['ntlm']
ntlm_mech = gssapi.OID.from_int_seq(ntlm_oid)
# GSS_NTLMSSP_RESET_CRYPTO_OID_LENGTH
# github.com/simo5/gss-ntlmssp/blob/master/src/gssapi_ntlmssp.h#L68
reset_mech = gssapi.OID.from_int_seq("1.3.6.1.4.1.7165.655.1.3")
try:
# we don't actually care about the account used here so just use
# a random username and password
ntlm_context = GSSAPIContext._get_security_context(
gssapi.NameType.user,
ntlm_mech,
"http@server",
"username",
"password"
)
ntlm_context.step()
set_sec_context_option(reset_mech, context=ntlm_context,
value=b"\x00" * 4)
except gssapi.exceptions.GSSError as exc:
# failed to init NTLM and verify gss-ntlmssp is available, this