Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_write_entry():
""" Test serialising an LDAP entry. """
ent = LDAPEntry("cn=test")
ent["cn"] = "test"
ent["jpegPhoto"] = b"1223122130008283938282931232"
ent["sn"] = "test😊"
ent["sn"].append(" test2")
with StringIO() as out:
ldif = LDIFWriter(out, max_length=32)
ldif.write_entry(ent)
content = out.getvalue()
contlines = content.split("\n")
surnames = {
b64decode(line.split(":: ")[1]).decode("UTF-8")
for line in contlines
if "sn" in line
}
jpeg_lines = []
def test_binary(client, basedn):
""" Test adding binary data. """
curdir = os.path.abspath(os.path.dirname(__file__))
conn = client.connect()
dname = "cn=binary,%s" % basedn
entry = LDAPEntry(dname)
entry["objectclass"] = ["top", "inetOrgPerson"]
entry["sn"] = "binary_test"
with open("%s/testenv/test.jpeg" % curdir, "rb") as image:
entry["jpegPhoto"] = image.read()
conn.add(entry)
result = conn.search(dname, 0)
entry.delete()
conn.close()
assert "jpegPhoto" in result[0].keys()
assert result[0]["jpegphoto"][0] == entry["jpegphoto"][0]
def test_wrong_params():
""" Test passing wrong params to LDAPEntry. """
with pytest.raises(TypeError):
_ = LDAPEntry("", 1)
with pytest.raises(InvalidDN):
_ = LDAPEntry("5", 1)
def test_sync_operations(client, basedn):
"""
Test LDAPEntry's add, modify and delete synchronous operations.
"""
entry = LDAPEntry("cn=test,%s" % basedn)
with pytest.raises(ValueError):
entry.modify()
with client.connect() as conn:
entry["sn"] = "test"
with pytest.raises(bonsai.ObjectClassViolation):
conn.add(entry)
entry["objectclass"] = [
"top",
"inetOrgPerson",
"person",
"organizationalPerson",
]
try:
conn.add(entry)
except bonsai.AlreadyExists:
conn.delete(entry.dn)
def test_clear_attribute_changes():
""" Test clear_attribute_changes method. """
user_dn = "cn=sam,ou=nerdherd,dc=bonsai,dc=test"
entry = LDAPEntry(user_dn)
entry.change_attribute("uidNumber", 0, 4)
assert entry["uidNumber"].added == [4]
entry.change_attribute("uidNumber", 1, 4)
assert entry["uidNumber"].deleted == [4]
entry.clear_attribute_changes("uidNumber")
assert entry["uidNumber"].status == 0
assert entry["uidNumber"].added == []
assert entry["uidNumber"].deleted == []
def test_modify_and_rename(self):
""" Test modifying and renaming an LDAP entry. """
with (yield self.client.connect(True, ioloop=self.io_loop)) as conn:
entry = LDAPEntry("cn=async_test,%s" % self.basedn)
entry["objectclass"] = [
"top",
"inetOrgPerson",
"person",
"organizationalPerson",
]
entry["sn"] = "async_test"
oldname = "cn=async_test,%s" % self.basedn
newname = "cn=async_test2,%s" % self.basedn
res = yield conn.search(newname, 0)
if res:
yield res[0].delete()
try:
yield conn.add(entry)
except bonsai.errors.AlreadyExists:
yield conn.delete(entry.dn)
def test_pop():
""" Test LDAPEntry's pop method. """
entry = LDAPEntry("cn=test")
entry["test"] = "test"
with pytest.raises(TypeError):
_ = entry.pop()
with pytest.raises(TypeError):
_ = entry.pop("t", 2, 3)
with pytest.raises(KeyError):
_ = entry.pop("t")
assert entry.pop("test") == ["test"]
assert entry.pop("test", None) is None
def test_connection(client, basedn):
""" Test set and get connection object from LDAPEntry. """
entry = LDAPEntry("cn=test,%s" % basedn)
with pytest.raises(ValueError):
_ = entry.connection
conn = client.connect()
entry.connection = conn
assert entry.connection == conn
with pytest.raises(TypeError):
entry.connection = "string"
with pytest.raises(TypeError):
del entry.connection
def test_modify_referrals(client):
""" Test modifying an LDAP referral with ManageDdsIT control. """
refdn = bonsai.LDAPDN("o=invalid-ref,ou=nerdherd,dc=bonsai,dc=test")
newref = "ldap://invalid.host/cn=nobody"
cli = LDAPClient(client.url)
cli.set_credentials(client.mechanism, **client.credentials)
cli.managedsait = True
with cli.connect() as conn:
entry = LDAPEntry(refdn, conn)
entry.change_attribute("ref", LDAPModOp.ADD, newref)
entry.modify()
res = conn.search(refdn, 0, attrlist=["ref"])[0]
assert len(res["ref"]) == 3
assert newref in res["ref"]
entry.change_attribute("ref", LDAPModOp.DELETE, newref)
entry.modify()
def test_items():
""" Test LDAPEntry's items method. """
entry = LDAPEntry("cn=test")
entry["cn"] = "test"
entry["sn"] = "Test"
assert len(entry.items()) == 3
assert ("dn", entry.dn) in entry.items()
assert ("cn", entry["cn"]) in entry.items()
assert ("sn", entry["sn"]) in entry.items()
assert len(list(entry.items(exclude_dn=True))) == 2
assert ("dn", entry.dn) not in entry.items(exclude_dn=True)
assert ("cn", entry["cn"]) in entry.items(exclude_dn=True)
assert ("sn", entry["sn"]) in entry.items(exclude_dn=True)