Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_raises_size_limit_exceeded_exception(self):
connection = Connection(self.server, user='cn=user1,ou=test', password='test1', client_strategy=MOCK_SYNC, raise_exceptions=True)
# create fixtures
connection.strategy.add_entry('cn=user1,ou=test', {'userPassword': 'test1', 'revision': 1})
connection.strategy.add_entry('cn=user2,ou=test', {'userPassword': 'test2', 'revision': 2})
connection.strategy.add_entry('cn=user3,ou=test', {'userPassword': 'test3', 'revision': 3})
connection.bind()
with self.assertRaises(LDAPSizeLimitExceededResult):
connection.search('ou=test', '(cn=*)', size_limit=1)
def test_start_tls_with_cipher(self):
# ciphers = None
# ciphers = '!aNULL:!eNULL:!LOW:!EXPORT:!SSLv2'
ciphers = 'HIGH:!aNULL:!RC4:!DSS'
if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
if isinstance(test_server, (list, tuple)):
server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
for host in test_server:
server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
else:
server = Server(host=test_server, port=test_port, tls=Tls(validate=ssl.CERT_NONE, ciphers=ciphers), get_info=test_get_info, mode=test_server_mode)
connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
connection.open()
connection.start_tls()
self.assertFalse(connection.closed)
# self.assertEqual(connection.socket.cipher(), ciphers)
connection.unbind()
if connection.strategy.pooled:
connection.strategy.terminate()
def setUp(self):
global testcase_id
testcase_id = random_id()
# The mock server can be defined in two different ways, so tests are duplicated, connection_3 is without schema
schema = SchemaInfo.from_json(edir_9_1_4_schema)
info = DsaInfo.from_json(edir_9_1_4_dsa_info, schema)
server_1 = Server.from_definition('MockSyncServer', info, schema)
self.connection_1 = Connection(server_1, user='cn=user1,ou=test,o=lab', password='test1111', client_strategy=MOCK_SYNC)
self.connection_1b = Connection(server_1, user='cn=user1,ou=test,o=lab', password='test1111', client_strategy=MOCK_SYNC)
self.connection_1c = Connection(server_1, user='cn=user1,ou=test,o=lab', password='test1111', client_strategy=MOCK_SYNC, raise_exceptions=True)
server_2 = Server('dummy', get_info=OFFLINE_EDIR_9_1_4)
self.connection_2 = Connection(server_2, user='cn=user2,ou=test,o=lab', password='test2222', client_strategy=MOCK_SYNC)
self.connection_2b = Connection(server_2, user='cn=user2,ou=test,o=lab', password='test2222', client_strategy=MOCK_SYNC)
self.connection_2c = Connection(server_2, user='cn=user2,ou=test,o=lab', password='test2222', client_strategy=MOCK_SYNC, raise_exceptions=True)
server_3 = Server('dummy') # no schema
self.connection_3 = Connection(server_3, user='cn=user3,ou=test,o=lab', password='test3333', client_strategy=MOCK_SYNC)
self.connection_3b = Connection(server_3, user='cn=user3,ou=test,o=lab', password='test3333', client_strategy=MOCK_SYNC)
self.connection_3c = Connection(server_3, user='cn=user3,ou=test,o=lab', password='test3333', client_strategy=MOCK_SYNC, raise_exceptions=True)
# creates fixtures
self.connection_1.strategy.add_entry('cn=user0,o=lab', {'userPassword': 'test0000', 'sn': 'user0_sn', 'revision': 0, 'guid': '07039e68-4373-264d-a0a7-000000000000'})
self.connection_2.strategy.add_entry('cn=user0,o=lab', {'userPassword': 'test0000', 'sn': 'user0_sn', 'revision': 0, 'guid': '07039e68-4373-264d-a0a7-000000000000'})
self.connection_3.strategy.add_entry('cn=user0,o=lab', {'userPassword': 'test0000', 'sn': 'user0_sn', 'revision': 0, 'guid': '07039e68-4373-264d-a0a7-000000000000'})
self.connection_1.strategy.add_entry('cn=user1,ou=test,o=lab', {'userPassword': 'test1111', 'sn': 'user1_sn', 'revision': 1, 'guid': '07039e68-4373-264d-a0a7-111111111111'})
"mail": ["fake@nickwhyte.com"],
"website": ["http://www.nickwhyte.com"],
"sn": ["User"],
"givenname": ["Fake"],
"objectclass": ["person"],
"dn": "cn=Fake User,ou=users,dc=mydomain,dc=com",
"password": "fake321",
},
ldap3.utils.conv.escape_filter_chars("cn=Jane (admin)"): {
"cn": ["Jane Citizen"],
"mail": ["jane@jane.com"],
"website": ["http://www.janecitizen.com"],
"sn": ["Citizen"],
"givenname": ["Jane"],
"objectclass": ["person"],
"dn": ldap3.utils.conv.escape_filter_chars(
"cn=Jane (admin),ou=users,dc=mydomain,dc=com"
),
"password": "fake123",
},
},
"ou=groups": {
"cn=Staff": {
"cn": ["Staff"],
"description": ["A Group for staff"],
"uniqueMember": [
"cn=Nick Whyte,ou=users,dc=mydomain,dc=com",
"cn=Fake User,ou=users,dc=mydomain,dc=com",
],
"objectclass": ["group"],
"dn": "cn=Staff,ou=groups,dc=mydomain,dc=com",
},
def test_modify_value_in_case_insentitive_dict_different_case_key(self):
cid = CaseInsensitiveDict()
cid['oNe'] = 1
cid['tWo'] = 2
cid[3] = 3
self.assertEqual(cid['ONE'], 1)
self.assertEqual(cid['oNe'], 1)
cid['one'] = 'ONE'
self.assertEqual(cid['ONE'], 'ONE')
self.assertEqual(cid['oNe'], 'ONE')
self.assertEqual(cid['TWO'], 2)
self.assertEqual(cid['two'], 2)
self.assertEqual(cid[3], 3)
def test_add_values_to_case_insentitive_dict(self):
cid = CaseInsensitiveDict()
cid['oNe'] = 1
cid['tWo'] = 2
cid[3] = 3
self.assertEqual(cid['ONE'], 1)
self.assertEqual(cid['one'], 1)
self.assertEqual(cid['TWO'], 2)
self.assertEqual(cid['two'], 2)
self.assertEqual(cid[3], 3)
def test_case_insensitive_dict_contains_same_case_key(self):
cid = CaseInsensitiveDict()
cid['oNe'] = 1
cid['tWo'] = 2
cid[3] = 3
self.assertTrue('oNe' in cid)
self.assertFalse('THREE' in cid)
self.assertFalse(4 in cid)
def test_create_empty_case_insensitive_dict(self):
cid = CaseInsensitiveDict()
self.assertTrue(isinstance(cid, CaseInsensitiveDict))
def test_equality_case_insensitive_dict_with_different_case(self):
cid = CaseInsensitiveDict()
cid['one'] = 1
cid['two'] = 2
cid[3] = 3
cid2 = CaseInsensitiveDict()
cid2['ONE'] = 1
cid2['TWO'] = 2
cid2[3] = 3
self.assertEqual(cid, cid2)
def test_start_tls(self):
if test_strategy not in [MOCK_SYNC, MOCK_ASYNC]:
if isinstance(test_server, (list, tuple)):
server = ServerPool(pool_strategy=test_pooling_strategy, active=test_pooling_active, exhaust=test_pooling_exhaust)
for host in test_server:
server.add(Server(host=host, port=test_port, allowed_referral_hosts=('*', True), get_info=test_get_info, mode=test_server_mode))
else:
server = Server(host=test_server, port=test_port, tls=Tls(validate=ssl.CERT_NONE), get_info=test_get_info, mode=test_server_mode)
connection = Connection(server, auto_bind=False, version=3, client_strategy=test_strategy, user=test_user, password=test_password, authentication=test_authentication, lazy=test_lazy_connection, pool_name='pool1')
connection.open()
connection.start_tls()
self.assertFalse(connection.closed)
connection.unbind()
if connection.strategy.pooled:
connection.strategy.terminate()