Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
super().setUp()
server = ldap3.Server(self.LDAP_URI, get_info=ldap3.ALL)
with ldap3.Connection(server, auto_bind=True,
client_strategy=ldap3.SYNC,
user=self.LDAP_ADMIN_UID,
password=self.LDAP_ADMIN_PASSWORD,
authentication=ldap3.SIMPLE) as conn:
self.conn = conn
self.delete_everything_below_base()
result = conn.add(self.LDAP_USER_BASE, 'organizationalUnit')
if not result:
self.add_failed(self.LDAP_USER_BASE)
for uid, data in self.fixtures.items():
data = data.copy()
parser.add_argument("--sslprotocol", type=native_str, help="SSL version for LDAP connection, can be SSLv23, TLSv1, TLSv1_1 or TLSv1_2")
args = parser.parse_args()
#Prompt for password if not set
authentication = None
if args.user is not None:
authentication = NTLM
if not '\\' in args.user:
print_f('Username must include a domain, use: DOMAIN\\username')
sys.exit(1)
if args.password is None:
args.password = getpass.getpass()
# define the server and the connection
s = Server(args.host, get_info=ALL)
if args.ssl:
s = Server(args.host, get_info=ALL, port=636, use_ssl=True)
if args.sslprotocol:
v = {'SSLv23' : 2, 'TLSv1' : 3, 'TLSv1_1' : 4, 'TLSv1_2' : 5}
if args.sslprotocol not in v.keys():
parser.print_help(sys.stderr)
sys.exit(1)
s = Server(args.host, get_info=ALL, port=636, use_ssl=True, tls=Tls(validate=0, version=v[args.sslprotocol]) )
if args.referralhosts:
s.allowed_referral_hosts = [('*', True)]
print_m('Connecting to host...')
c = Connection(s, user=args.user, password=args.password, authentication=authentication, auto_referrals=False)
print_m('Binding to host')
# perform the Bind operation
if not c.bind():
print_f('Could not bind with specified credentials')
def authenticate(self, request=None, ldap_id=None, ldap_pw=None):
user = User.objects.filter(profile__sparcs_id=ldap_id).first()
if not check_active_user(request, user):
return
# prevent LDAP injection attack
# the regex is taken from NAME_REGEX in adduser
if not re.match(r'^[a-z][-a-z0-9]*$', ldap_id):
return
ldap_server = ldap3.Server(
'ldap://sparcs.org', use_ssl=True, get_info=ldap3.ALL,
)
ldap_connection = ldap3.Connection(
ldap_server,
user=f'uid={ldap_id},ou=People,dc=sparcs,dc=org',
password=ldap_pw,
)
if not ldap_connection.bind():
return
return user
def init_ldap_client(self) -> None:
self.ldap_client = Server(self.ldap_server, get_info=ALL)
def run_ldaps(self):
connectTo = self.__target
if self.__targetIp is not None:
connectTo = self.__targetIp
try:
user = '%s\\%s' % (self.__domain, self.__username)
tls = ldap3.Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
try:
ldapServer = ldap3.Server(connectTo, use_ssl=True, port=self.__port, get_info=ldap3.ALL, tls=tls)
if self.__doKerberos:
ldapConn = ldap3.Connection(ldapServer)
self.LDAP3KerberosLogin(ldapConn, self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash,
self.__aesKey, kdcHost=self.__kdcHost)
elif self.__hashes is not None:
ldapConn = ldap3.Connection(ldapServer, user=user, password=self.__hashes, authentication=ldap3.NTLM)
ldapConn.bind()
else:
ldapConn = ldap3.Connection(ldapServer, user=user, password=self.__password, authentication=ldap3.NTLM)
ldapConn.bind()
except ldap3.core.exceptions.LDAPSocketOpenError:
#try tlsv1
tls = ldap3.Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1)
ldapServer = ldap3.Server(connectTo, use_ssl=True, port=self.__port, get_info=ldap3.ALL, tls=tls)
if self.__doKerberos:
from ldap3 import Server, Connection, ObjectDef, AttrDef, Reader, Writer, ALL
server = Server('ipa.demo1.freeipa.org', get_info=ALL)
conn = Connection(server, 'uid=admin,cn=users,cn=accounts,dc=demo1,dc=freeipa,dc=org', 'Secret123', auto_bind=True)
obj_person = ObjectDef('person', conn)
r = Reader(conn, obj_person, None, 'dc=demo1,dc=freeipa,dc=org')
r.search()
w = Writer.from_cursor(r)
print(w)
print(w[0])
def ldap_query(self, query):
if not self.ldap_enabled:
return None
from ldap3 import Server, Connection, SIMPLE, SYNC, ASYNC, SUBTREE, ALL, ALL_ATTRIBUTES
import json
try:
logging.debug("connecting to ldap server {} on port {}".format(self.ldap_server, self.ldap_port))
with Connection(
Server(self.ldap_server, port = self.ldap_port, get_info = ALL),
auto_bind = True,
client_strategy = SYNC,
user=self.ldap_bind_user,
password=self.ldap_bind_password,
authentication=SIMPLE,
check_names=True) as c:
logging.debug("running ldap query for ({})".format(query))
c.search(self.ldap_base_dn, '({})'.format(query), SUBTREE, attributes = ALL_ATTRIBUTES)
# a little hack to move the result into json
response = json.loads(c.response_to_json())
result = c.result
if len(response['entries']) < 1:
return None
except RuntimeError:
return {}
return {
'uri': conf['HSS_LDAP_URI'],
'userdn_format': conf['HSS_LDAP_USERDN_FORMAT'],
'system_bind': conf['HSS_LDAP_SYSTEM_BIND'],
'system_password': conf['HSS_LDAP_SYSTEM_PASSWORD'],
'search_base': conf['HSS_LDAP_SEARCH_BASE'],
}
class HssLdapConnector(BaseLdapConnector):
config = HssConfigProxy()
DEFAULT_SERVER_ARGS = {
'get_info': ldap3.ALL,
}
DEFAULT_CONNECT_ARGS = {
'auto_bind': True,
'client_strategy': ldap3.SYNC,
'authentication': ldap3.SIMPLE,
}
@classmethod
def get_system_bind_credentials(cls):
return cls.config['system_bind'], cls.config['system_password']
@classmethod
def get_bind_credentials(cls, username, password):
"""Return the according userdn of `username` and the given password"""
return cls.config['userdn_format'].format(user=username), password
def initConnection(self):
self.server = Server("ldap://%s:%s" % (self.targetHost, self.targetPort), get_info=ALL)
self.session = Connection(self.server, user="a", password="b", authentication=NTLM)
self.session.open(False)
return True
def __init__(self, ldap_host, ldap_user = None, ldap_password = None, ldap_use_ssl = None):
super().__init__()
self.m_server = Server(ldap_host, use_ssl = ldap_use_ssl, get_info = ALL)
self.m_connection = Connection(self.m_server, ldap_user, ldap_password)
self.m_connection.bind()