Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_binding(auth, mech, authzid=None, realm=None):
cfg = get_config()
host = "ldap://%s" % cfg["SERVER"]["hostname"]
client = LDAPClient(host)
client.set_credentials(
mech, cfg[auth]["user"], cfg[auth]["password"], realm, authzid
)
return client.connect()
def _generate_client(cfg):
url = "ldap://{host}:{port}/ou=nerdherd,{basedn}?{attr}?{scope}".format(
host=cfg["SERVER"]["hostip"],
port=cfg["SERVER"]["port"],
basedn=cfg["SERVER"]["basedn"],
attr=cfg["SERVER"]["search_attr"],
scope=cfg["SERVER"]["search_scope"],
)
client = LDAPClient(url)
client.set_credentials(
"SIMPLE", user=cfg["SIMPLEAUTH"]["user"], password=cfg["SIMPLEAUTH"]["password"]
)
client.auto_page_acquire = False
return client
def test_max_connection():
""" Test max_connection property. """
cli = LDAPClient("ldap://dummy.nfo")
pool = ConnectionPool(cli, minconn=5, maxconn=5)
assert pool.max_connection == 5
with pytest.raises(ValueError):
pool.max_connection = 4
pool.max_connection = 10
assert pool.max_connection == 10
universal_newlines=True,
)
output = " ".join(proc.communicate())
if "Heimdal" in output:
# Heimdal Kerberos implementation.
with tempfile.NamedTemporaryFile() as psw_tmp:
psw_tmp.write(password.encode())
psw_tmp.flush()
cmd = ["kinit", "--password-file=%s" % psw_tmp.name, user]
subprocess.check_call(cmd)
else:
# MIT Kerberos implementation.
cmd = 'echo "%s" | kinit %s' % (password, user)
subprocess.check_output(cmd, shell=True)
host = "ldap://%s" % cfg["SERVER"]["hostname"]
client = LDAPClient(host)
client.set_credentials(
"GSSAPI",
cfg["GSSAPIAUTH"]["user"],
cfg["GSSAPIAUTH"]["password"],
None,
authzid,
)
return client.connect()
except subprocess.CalledProcessError:
pytest.fail("Receiving TGT is failed.")
def test_references_prop(host_url):
""" Testing references property. """
client = LDAPClient(host_url)
reflist = [LDAPURL("ldap://localhost"), host_url]
ref = LDAPReference(client, reflist)
assert ref.references == reflist
with pytest.raises(ValueError):
ref.references = None
def test_digest_auth_error(cfg):
""" Test DIGEST-MD5 authentication error. """
client = LDAPClient("ldap://%s" % cfg["SERVER"]["hostname"])
if cfg["DIGESTAUTH"]["realm"] == "None":
realm = None
else:
realm = cfg["DIGESTAUTH"]["realm"].upper()
client.set_credentials(
"DIGEST-MD5", cfg["DIGESTAUTH"]["user"], "wrongpassword", realm, None
)
with pytest.raises(bonsai.AuthenticationError):
_ = client.connect()
def test_wrong_add_param(conn, ipaddr):
""" Test passing wrong parameter for add method. """
with pytest.raises(ClosedConnection):
cli = LDAPClient("ldap://%s" % ipaddr)
LDAPConnection(cli).add(bonsai.LDAPEntry("cn=dummy"))
with pytest.raises(TypeError):
conn.add("wrong")
def test_wrong_search_param(ipaddr):
""" Test passing wrong parameters for search method. """
with pytest.raises(ClosedConnection):
cli = LDAPClient("ldap://%s" % ipaddr)
LDAPConnection(cli).search()
with pytest.raises(ValueError):
cli = LDAPClient("ldap://%s" % ipaddr)
LDAPConnection(cli).open().search()
with pytest.raises(TypeError):
cli = LDAPClient("ldap://%s" % ipaddr)
LDAPConnection(cli).open().search("", 0, 3)
def test_password_modify_extop(conn, ipaddr):
""" Test Password Modify extended operation. """
user_dn = LDAPDN("cn=skip,ou=nerdherd,dc=bonsai,dc=test")
cli = LDAPClient("ldap://%s" % ipaddr)
cli.set_credentials("SIMPLE", str(user_dn), "p@ssword")
test_conn = cli.connect()
with pytest.raises(TypeError):
test_conn.modify_password(new_password=0)
test_conn.modify_password(user_dn, "newpassword", "p@ssword")
test_conn.close()
with pytest.raises(ClosedConnection):
test_conn.modify_password()
try:
cli.set_credentials("SIMPLE", str(user_dn), "newpassword")
cli.set_password_policy(True)
test_conn, ctrl = cli.connect()
newpass = test_conn.modify_password()
test_conn.close()
assert isinstance(newpass, str)
cli.set_credentials("SIMPLE", str(user_dn), newpass)