Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_raises_size_limit_exceeded_exception(self):
connection = Connection(self.server, user='cn=user1,ou=test', password='test1', client_strategy=MOCK_SYNC, raise_exceptions=True)
# create fixtures
connection.strategy.add_entry('cn=user1,ou=test', {'userPassword': 'test1', 'revision': 1})
connection.strategy.add_entry('cn=user2,ou=test', {'userPassword': 'test2', 'revision': 2})
connection.strategy.add_entry('cn=user3,ou=test', {'userPassword': 'test3', 'revision': 3})
connection.bind()
with self.assertRaises(LDAPSizeLimitExceededResult):
connection.search('ou=test', '(cn=*)', size_limit=1)
args.password = getpass.getpass()
# define the server and the connection
s = Server(args.host, get_info=ALL)
if args.ssl:
s = Server(args.host, get_info=ALL, port=636, use_ssl=True)
if args.sslprotocol:
v = {'SSLv23' : 2, 'TLSv1' : 3, 'TLSv1_1' : 4, 'TLSv1_2' : 5}
if args.sslprotocol not in v.keys():
parser.print_help(sys.stderr)
sys.exit(1)
s = Server(args.host, get_info=ALL, port=636, use_ssl=True, tls=Tls(validate=0, version=v[args.sslprotocol]) )
if args.referralhosts:
s.allowed_referral_hosts = [('*', True)]
print_m('Connecting to host...')
c = Connection(s, user=args.user, password=args.password, authentication=authentication, auto_referrals=False)
print_m('Binding to host')
# perform the Bind operation
if not c.bind():
print_f('Could not bind with specified credentials')
print_f(c.result)
sys.exit(1)
print_o('Bind OK')
domainroot = s.info.other['defaultNamingContext'][0]
forestroot = s.info.other['rootDomainNamingContext'][0]
if args.forest:
dnsroot = 'CN=MicrosoftDNS,DC=ForestDnsZones,%s' % forestroot
else:
if args.legacy:
dnsroot = 'CN=MicrosoftDNS,CN=System,%s' % domainroot
else:
dnsroot = 'CN=MicrosoftDNS,DC=DomainDnsZones,%s' % domainroot
def ldap_server_test(self):
try:
conn = Connection(self.ldap_server, user=self._ldap_admin_dn, password=self._ldap_admin_password,
check_names=True, lazy=False, raise_exceptions=False)
conn.open()
conn.bind()
return True
except Exception as e:
print("auth fail {}".format(e))
return False
use_ssl = True
ldap_port = 636
if ":" in url_parsed.netloc:
ldap_host, ldap_port = url_parsed.netloc.split(":")
else:
ldap_host = url_parsed.netloc
server = ldap3.Server(ldap_host,
port=int(ldap_port),
use_ssl=use_ssl)
servers_list.append(server)
last_auth_err = ""
for bind_dn in ldap_dn_list:
c = ldap3.Connection(servers_list,
user=bind_dn % login,
password=password)
# perform the Bind operation
auth_success = c.bind()
last_auth_err = c.result
if auth_success:
break
if not auth_success:
self.trace(last_auth_err)
return auth_success
def connect(self, user, password, anonymous=False):
auto_bind_strategy = AUTO_BIND_TLS_BEFORE_BIND
authentication_policy = SIMPLE
if current_app.config['LDAP_USE_TLS'] is not True:
auto_bind_strategy = AUTO_BIND_NO_TLS
if anonymous:
authentication_policy = ANONYMOUS
user = None
password = None
ldap_conn = Connection(
self.ldap_server,
auto_bind=auto_bind_strategy,
client_strategy=current_app.config['LDAP_CONNECTION_STRATEGY'],
raise_exceptions=current_app.config['LDAP_RAISE_EXCEPTIONS'],
authentication=authentication_policy,
user=user,
password=password,
check_names=True,
read_only=current_app.config['LDAP_READ_ONLY'],
)
return ldap_conn
con['raise_exceptions'] = True
con['authentication'] = ldap3.NTLM
if use_ssl:
certfile = settings.AD_CERT_FILE
self.debug('ldap.ssl :: Activated - Cert file: {}'.format(certfile))
con['auto_bind'] = ldap3.AUTO_BIND_TLS_BEFORE_BIND
ser['use_ssl'] = True
ser['tls'] = ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1)
else:
con['auto_bind'] = ldap3.AUTO_BIND_NO_TLS
try:
#self.debug('ldap.server params :: {}'.format(ser))
server = ldap3.Server(ldap_url, **ser)
self.debug('ldap.server :: {}'.format(server))
#self.debug('ldap.connection params :: {}'.format(con))
answer = ldap3.Connection(server, **con)
self.debug('ldap.connection :: {}'.format(answer))
#answer.open()
#answer.bind()
self.debug('ldap.connected :: Authorized')
except LDAPSocketOpenError as e:
# The access for this user has been denied, Debug it if required
self.debug("LDAP connect failed 'SocketOpenError' for url '{}' with error '{}'".format(ldap_url, e))
answer = False
except LDAPException as e:
# The access for this user has been denied, Debug it if required
self.debug("LDAP connect failed 'LDAPException' for user '{}' with error '{}'".format(username, e))
answer = False
else:
# The access for this user has been denied, Debug it if required
self.debug("No password provided for user '{}'".format(username))
def action_sudoldap_delete(action, helper, wfconfig):
try:
# Connect to LDAP
l = ldap3.Connection(
ldap3.Server(wfconfig['SUDO_LDAP_URL']),
wfconfig['SUDO_LDAP_USER'],
wfconfig['SUDO_LDAP_PASS'],
auto_bind=False
)
if not l.bind():
raise helper.lib.TaskFatalError(message="Failed to bind to the sudoldap server.")
# Delete the entry
l.delete(action['data']['dn'])
return True
except Exception:
helper.end_event(success=False, description="Failed to delete the object in sudoldap")
return False
def _fetch_all_groups(self):
"""
Fetches all ldap groups under given base DN.
"""
with Connection(self._ldap_server, self._username, self._password) as ldap_conn:
search_filter = f'(objectClass={self.GROUPS_OBJECT_CLASS})'
ldap_group_entries = ldap_conn.extend.standard.paged_search(search_base=self._base_dn,
search_filter=search_filter,
attributes=[self.GROUPS_IDENTIFIER_ATTRIBUTE],
paged_size=self._page_size)
return {
'Controls': None,
'Referrals': ldap_conn.result.get('referrals'),
'Entries': LdapClient._parse_ldap_group_entries(ldap_group_entries, self.GROUPS_IDENTIFIER_ATTRIBUTE)
}
def get_ldap_connection(self):
try:
server = Server(self.LDAP_SERVER, port=self.LDAP_PORT, get_info=ALL, use_ssl=self.LDAP_USE_SSL, connect_timeout=self.LDAP_CONNECT_TIMEOUT)
if self.LDAP_AUTH_TYPE == "NTLM":
connection = Connection(
server=server,
user=self.LDAP_USER_NTLM,
password=self.LDAP_PASSWORD,
authentication=NTLM,
return_empty_attributes=True,
raise_exceptions=True)
else:
connection = Connection(
server=server,
user=self.LDAP_USER_DN,
password=self.LDAP_PASSWORD,
authentication=self.LDAP_AUTH_TYPE,
return_empty_attributes=True,
raise_exceptions=True)
return connection
except Exception as err:
raise ValueError("Cannot connect to LDAP Server. Ensure credentials are correct\n Error: {0}".format(err))