Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_start_tls(self):
if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
if isinstance(test_server, (list, tuple)):
server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
for host in test_server:
server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
else:
server = Server(host=test_server, port=test_port, tls=Tls(validate=ssl.CERT_NONE), get_info=test_get_info, mode=test_server_mode)
connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
connection.open()
connection.start_tls()
self.assertFalse(connection.closed)
connection.unbind()
if connection.strategy.pooled:
connection.strategy.terminate()
host, port = url.netloc.split(":")
port = int(port)
else:
host = url.netloc
if use_ssl:
port = 636
else:
port = 389
start_tls = False
if 'START_TLS' in settings and settings['START_TLS']:
start_tls = True
tls = None
if use_ssl or start_tls:
tls = ldap3.Tls()
if 'TLS_CA' in settings and settings['TLS_CA']:
tls.ca_certs_file = settings['TLS_CA']
if 'REQUIRE_TLS' in settings and settings['REQUIRE_TLS']:
tls.validate = ssl.CERT_REQUIRED
s = ldap3.Server(host, port=port, use_ssl=use_ssl, tls=tls)
c = self._connection_class(
s, # client_strategy=ldap3.STRATEGY_SYNC_RESTARTABLE,
user=user, password=password, authentication=ldap3.SIMPLE)
c.strategy.restartable_sleep_time = 0
c.strategy.restartable_tries = 1
c.raise_exceptions = True
c.open()
if not '\\' in args.user:
print_f('Username must include a domain, use: DOMAIN\\username')
sys.exit(1)
if args.password is None:
args.password = getpass.getpass()
# define the server and the connection
s = Server(args.host, get_info=ALL)
if args.ssl:
s = Server(args.host, get_info=ALL, port=636, use_ssl=True)
if args.sslprotocol:
v = {'SSLv23' : 2, 'TLSv1' : 3, 'TLSv1_1' : 4, 'TLSv1_2' : 5}
if args.sslprotocol not in v.keys():
parser.print_help(sys.stderr)
sys.exit(1)
s = Server(args.host, get_info=ALL, port=636, use_ssl=True, tls=Tls(validate=0, version=v[args.sslprotocol]) )
if args.referralhosts:
s.allowed_referral_hosts = [('*', True)]
print_m('Connecting to host...')
c = Connection(s, user=args.user, password=args.password, authentication=authentication, auto_referrals=False)
print_m('Binding to host')
# perform the Bind operation
if not c.bind():
print_f('Could not bind with specified credentials')
print_f(c.result)
sys.exit(1)
print_o('Bind OK')
domainroot = s.info.other['defaultNamingContext'][0]
forestroot = s.info.other['rootDomainNamingContext'][0]
if args.forest:
dnsroot = 'CN=MicrosoftDNS,DC=ForestDnsZones,%s' % forestroot
else:
def connect(self):
# check configuration
if not (hasattr(settings, 'LDAP_SERVERS') and hasattr(settings, 'LDAP_BIND_ADMIN') and
hasattr(settings, 'LDAP_BIND_ADMIN_PASS') and hasattr(settings, 'LDAP_AD_DOMAIN')
and hasattr(settings, 'LDAP_CERT_FILE')
):
raise ImproperlyConfigured()
# first: build server pool from settings
tls = Tls(validate=ssl.CERT_OPTIONAL, version=ssl.PROTOCOL_TLSv1, ca_certs_file=settings.LDAP_CERT_FILE)
if self.pool is None:
self.pool = ServerPool(None, pool_strategy=FIRST, active=True)
for srv in settings.LDAP_SERVERS:
# Only add servers that supports SSL, impossible to make changes without
if srv['use_ssl']:
server = Server(srv['host'], srv['port'], srv['use_ssl'], tls=tls)
self.pool.add(server)
# then, try to connect with user/pass from settings
self.con = Connection(self.pool, auto_bind=True, authentication=SIMPLE,
user=settings.LDAP_BIND_ADMIN, password=settings.LDAP_BIND_ADMIN_PASS)
self.debug('ldap.initialize :: url: {}'.format(ldap_url))
# Prepare library
ser = {}
ser['allowed_referral_hosts'] = [("*", True)]
con = {}
con['user'] = "{}\{}".format(nt4_domain, username)
con['password'] = password
con['raise_exceptions'] = True
con['authentication'] = ldap3.NTLM
if use_ssl:
certfile = settings.AD_CERT_FILE
self.debug('ldap.ssl :: Activated - Cert file: {}'.format(certfile))
con['auto_bind'] = ldap3.AUTO_BIND_TLS_BEFORE_BIND
ser['use_ssl'] = True
ser['tls'] = ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1)
else:
con['auto_bind'] = ldap3.AUTO_BIND_NO_TLS
try:
# self.debug('ldap.server params :: {}'.format(ser))
server = ldap3.Server(ldap_url, **ser)
self.debug('ldap.server :: {}'.format(server))
# self.debug('ldap.connection params :: {}'.format(con))
answer = ldap3.Connection(server, **con)
self.debug('ldap.connection :: {}'.format(answer))
# answer.open()
# answer.bind()
self.debug('ldap.connected :: Authorized')
except LDAPSocketOpenError as e:
# The access for this user has been denied, Debug it if required
self.debug("LDAP connect failed 'SocketOpenError' for url '{}' with error '{}'".format(ldap_url, e))
answer = None
self.debug('ldap.initialize :: url: {}'.format(ldap_url))
# Prepare library
ser = {}
ser['allowed_referral_hosts'] = [("*", True)]
con = {}
con['user'] = "{}\{}".format(nt4_domain,username)
con['password'] = password
con['raise_exceptions'] = True
con['authentication'] = ldap3.NTLM
if use_ssl:
certfile = settings.AD_CERT_FILE
self.debug('ldap.ssl :: Activated - Cert file: {}'.format(certfile))
con['auto_bind'] = ldap3.AUTO_BIND_TLS_BEFORE_BIND
ser['use_ssl'] = True
ser['tls'] = ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1)
else:
con['auto_bind'] = ldap3.AUTO_BIND_NO_TLS
try:
server = ldap3.Server(ldap_url, **ser)
self.debug('ldap.server :: {}'.format(server))
answer = ldap3.Connection(server, **con)
self.debug('ldap.connection :: {}'.format(answer))
#answer.open()
#answer.bind()
self.debug('ldap.connected :: Authorized')
except LDAPSocketOpenError as e:
# The access for this user has been denied, Debug it if required
self.debug("LDAP connect failed for url '{}' with error '{}'".format(ldap_url, e))
answer = False
except LDAPException as e:
# The access for this user has been denied, Debug it if required
def connect_gssapi(host):
serv = ldap3.Server(host,
tls=ldap3.Tls(validate=ssl.CERT_REQUIRED),
get_info="DSA")
conn = ldap3.Connection(serv,
authentication="SASL",
sasl_mechanism="GSSAPI")
conn.open()
if not conn.start_tls():
raise Exception("start_tls failed", conn.result)
if not conn.bind():
raise Exception("bind failed", conn.result)
return conn
def establish_and_return_ldap_connection(host, port, use_ssl, ca_certs_file,
ca_certs_data, bind_dn, bind_pw):
tls = None
if ca_certs_file or ca_certs_data:
tls = ldap3.Tls(ca_certs_file=ca_certs_file,
ca_certs_data=ca_certs_data, validate=ssl.CERT_REQUIRED)
server = ldap3.Server(host=host, port=port, use_ssl=use_ssl, tls=tls)
return ldap3.Connection(server, user=bind_dn, password=bind_pw, auto_bind=True)
def _initialize_ldap_server(self):
"""
Initializes ldap server object with given parameters. Supports both encrypted and non encrypted connection.
:rtype: ldap3.Server
:return: Initialized ldap server object.
"""
if self._connection_type == 'ssl':
tls = Tls(validate=CERT_REQUIRED,
ca_certs_file=os.environ.get('SSL_CERT_FILE')) if self._verify else None
# if certificate verification isn't required, SSL connection will be used. Otherwise secure connection
# will be performed over Tls.
return Server(host=self._host, port=self._port, use_ssl=True, tls=tls, connect_timeout=LdapClient.TIMEOUT)
else:
# non encrypted connection initialized
return Server(host=self._host, port=self._port, connect_timeout=LdapClient.TIMEOUT)