Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
c_token = gss_ctxt.step(c_token)
self.assertNotEquals(None, c_token)
# Build MIC
mic_token = gss_ctxt.get_mic(mic_msg)
if self.server_mode:
# Check MIC
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
elif _API == "PYTHON-GSSAPI-NEW":
if self.server_mode:
gss_flags = (
gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.delegate_to_peer,
)
else:
gss_flags = (
gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.delegate_to_peer,
)
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
target_name = gssapi.Name(
"host@" + self.targ_name,
name_type=gssapi.NameType.hostbased_service,
)
gss_ctxt = gssapi.SecurityContext(
name=target_name,
flags=gss_flags,
def parse_response(self, response):
if six.PY2:
header = response.msg.getheaders('Set-Cookie')
else:
header = response.msg.get_all('Set-Cookie')
self.store_session_cookie(header)
return SSLTransport.parse_response(self, response)
class DelegatedKerbTransport(KerbTransport):
"""
Handles Kerberos Negotiation authentication and TGT delegation to an
XML-RPC server.
"""
flags = [gssapi.RequirementFlag.delegate_to_peer,
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.out_of_sequence_detection]
class RPCClient(Connectible):
"""
Forwarding backend plugin for XML-RPC client.
Also see the `ipaserver.rpcserver.xmlserver` plugin.
"""
# Values to set on subclasses:
session_path = None
server_proxy_class = ServerProxy
protocol = None
env_rpc_uri_key = None
except gssapi.exceptions.GSSError:
pass
if acquire_with_pass:
# error when trying to access the existing cache, get our own
# credentials with the password specified
b_password = password.encode('utf-8')
cred = acquire_cred_with_password(user, b_password,
usage='initiate',
mechs=[mech])
cred = cred.creds
flags = gssapi.RequirementFlag.confidentiality | \
gssapi.RequirementFlag.mutual_authentication | \
gssapi.RequirementFlag.integrity | \
gssapi.RequirementFlag.out_of_sequence_detection
context = gssapi.SecurityContext(name=server_name,
creds=cred,
usage='initiate',
mech=mech,
flags=flags)
return context
tls_version_min=api.env.tls_version_min,
tls_version_max=api.env.tls_version_max)
conn.connect()
logger.debug("New HTTP connection (%s)", host)
self._connection = host, conn
return self._connection[1]
class KerbTransport(SSLTransport):
"""
Handles Kerberos Negotiation authentication to an XML-RPC server.
"""
flags = [gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.out_of_sequence_detection]
def __init__(self, *args, **kwargs):
SSLTransport.__init__(self, *args, **kwargs)
self._sec_context = None
self.service = kwargs.pop("service", "HTTP")
self.ccache = kwargs.pop("ccache", None)
def _handle_exception(self, e, service=None):
minor = e.min_code
if minor == KRB5KDC_ERR_S_PRINCIPAL_UNKNOWN:
raise errors.ServiceError(service=service)
elif minor == KRB5_FCC_NOFILE:
raise errors.NoCCacheError()
elif minor == KRB5KRB_AP_ERR_TKT_EXPIRED:
raise errors.TicketExpired()
elif minor == KRB5_FCC_PERM:
if six.PY2:
header = response.msg.getheaders('Set-Cookie')
else:
header = response.msg.get_all('Set-Cookie')
self.store_session_cookie(header)
return SSLTransport.parse_response(self, response)
class DelegatedKerbTransport(KerbTransport):
"""
Handles Kerberos Negotiation authentication and TGT delegation to an
XML-RPC server.
"""
flags = [gssapi.RequirementFlag.delegate_to_peer,
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.out_of_sequence_detection]
class RPCClient(Connectible):
"""
Forwarding backend plugin for XML-RPC client.
Also see the `ipaserver.rpcserver.xmlserver` plugin.
"""
# Values to set on subclasses:
session_path = None
server_proxy_class = ServerProxy
protocol = None
env_rpc_uri_key = None
def get_url_list(self, rpc_uri):
# and don't need to acquire with the password
acquire_with_pass = False
except gssapi.exceptions.GSSError:
pass
if acquire_with_pass:
# error when trying to access the existing cache, get our own
# credentials with the password specified
b_password = password.encode('utf-8')
cred = acquire_cred_with_password(user, b_password,
usage='initiate',
mechs=[mech])
cred = cred.creds
flags = gssapi.RequirementFlag.confidentiality | \
gssapi.RequirementFlag.mutual_authentication | \
gssapi.RequirementFlag.integrity | \
gssapi.RequirementFlag.out_of_sequence_detection
context = gssapi.SecurityContext(name=server_name,
creds=cred,
usage='initiate',
mech=mech,
flags=flags)
return context
def provides_mutual_auth(self):
"""Return whether or not this context provides mutual authentication"""
return (RequirementFlag.mutual_authentication in
self._ctx.actual_flags)
getattr(context, 'ca_certfile', None),
tls_version_min=api.env.tls_version_min,
tls_version_max=api.env.tls_version_max)
conn.connect()
logger.debug("New HTTP connection (%s)", host)
self._connection = host, conn
return self._connection[1]
class KerbTransport(SSLTransport):
"""
Handles Kerberos Negotiation authentication to an XML-RPC server.
"""
flags = [gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.out_of_sequence_detection]
def __init__(self, *args, **kwargs):
SSLTransport.__init__(self, *args, **kwargs)
self._sec_context = None
self.service = kwargs.pop("service", "HTTP")
self.ccache = kwargs.pop("ccache", None)
def _handle_exception(self, e, service=None):
minor = e.min_code
if minor == KRB5KDC_ERR_S_PRINCIPAL_UNKNOWN:
raise errors.ServiceError(service=service)
elif minor == KRB5_FCC_NOFILE:
raise errors.NoCCacheError()
elif minor == KRB5KRB_AP_ERR_TKT_EXPIRED:
raise errors.TicketExpired()
raise ValueError("Can only use implicit credentials with kerberos "
"authentication")
if cred is None:
# error when trying to access the existing cache, get our own
# credentials with the password specified
b_password = to_bytes(password)
cred = gssapi.raw.acquire_cred_with_password(username, b_password,
usage='initiate',
mechs=[mech])
cred = cred.creds
flags = gssapi.RequirementFlag.mutual_authentication | \
gssapi.RequirementFlag.out_of_sequence_detection
if delegate:
flags |= gssapi.RequirementFlag.delegate_to_peer
if wrap_required:
flags |= gssapi.RequirementFlag.confidentiality
context = gssapi.SecurityContext(name=server_name,
creds=cred,
usage='initiate',
mech=mech,
flags=flags,
channel_bindings=channel_bindings)
return context
# we successfully got the Kerberos credential from the cache
# and don't need to acquire with the password
acquire_with_pass = False
except gssapi.exceptions.GSSError:
pass
if acquire_with_pass:
# error when trying to access the existing cache, get our own
# credentials with the password specified
b_password = password.encode('utf-8')
cred = acquire_cred_with_password(user, b_password,
usage='initiate',
mechs=[mech])
cred = cred.creds
flags = gssapi.RequirementFlag.confidentiality | \
gssapi.RequirementFlag.mutual_authentication | \
gssapi.RequirementFlag.integrity | \
gssapi.RequirementFlag.out_of_sequence_detection
context = gssapi.SecurityContext(name=server_name,
creds=cred,
usage='initiate',
mech=mech,
flags=flags)
return context