Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if test_logging:
try:
remove(test_logging_filename)
except OSError:
pass
import logging
logging.basicConfig(filename=test_logging_filename, level=logging.DEBUG)
set_library_log_activation_level(logging.DEBUG)
set_library_log_detail_level(test_log_detail)
print('Testing location:', location, ' - Test server:', test_server)
print('Python version:', version, ' - ldap3 version:', ldap3_version, ' - pyasn1 version:', pyasn1_version,)
print('Strategy:', test_strategy, '- Lazy:', test_lazy_connection, '- Check names:', test_check_names, '- Collect usage:', test_usage, ' - pool size:', test_pool_size)
print('Default client encoding:', get_config_parameter('DEFAULT_CLIENT_ENCODING'), ' - Default server encoding:', get_config_parameter('DEFAULT_SERVER_ENCODING'), '- Source encoding:', getdefaultencoding(), '- File encoding:', getfilesystemencoding(), ' - Additional server encodings:', ', '.join(get_config_parameter('ADDITIONAL_SERVER_ENCODINGS')))
print('Logging:', 'False' if not test_logging else test_logging_filename, '- Log detail:', (get_detail_level_name(test_log_detail) if test_logging else 'None') + ' - Internal decoder: ', test_internal_decoder, ' - Response waiting timeout:', get_config_parameter('RESPONSE_WAITING_TIMEOUT'))
print()
def random_id():
return str(SystemRandom().random())[-5:]
def generate_dn(base, batch_id, name):
return test_name_attr + '=' + batch_id + name + ',' + base
def get_connection(bind=None,
use_ssl=None,
check_names=None,
lazy_connection=None,
def compare(self,
dn,
attribute,
value,
controls=None):
"""
Perform a compare operation
"""
conf_attributes_excluded_from_check = [v.lower() for v in get_config_parameter('ATTRIBUTES_EXCLUDED_FROM_CHECK')]
if log_enabled(BASIC):
log(BASIC, 'start COMPARE operation via <%s>', self)
self.last_error = None
if self.check_names:
dn = safe_dn(dn)
if log_enabled(EXTENDED):
log(EXTENDED, 'dn sanitized to <%s> for COMPARE operation via <%s>', dn, self)
if self.server and self.server.schema and self.check_names:
if ';' in attribute: # remove tags for checking
attribute_name_to_check = attribute.split(';')[0]
else:
attribute_name_to_check = attribute
if self.server.schema.attribute_types and attribute_name_to_check.lower() not in conf_attributes_excluded_from_check and attribute_name_to_check not in self.server.schema.attribute_types:
def validate_attribute_value(schema, name, value, auto_encode, validator=None, check_names=False):
conf_classes_excluded_from_check = [v.lower() for v in get_config_parameter('CLASSES_EXCLUDED_FROM_CHECK')]
conf_attributes_excluded_from_check = [v.lower() for v in get_config_parameter('ATTRIBUTES_EXCLUDED_FROM_CHECK')]
conf_utf8_syntaxes = get_config_parameter('UTF8_ENCODED_SYNTAXES')
conf_utf8_types = [v.lower() for v in get_config_parameter('UTF8_ENCODED_TYPES')]
if schema and schema.attribute_types:
if ';' in name:
name = name.split(';')[0]
if check_names and schema.object_classes and name.lower() == 'objectclass':
if to_unicode(value).lower() not in conf_classes_excluded_from_check and to_unicode(value) not in schema.object_classes:
raise LDAPObjectClassError('invalid class in objectClass attribute: ' + str(value))
elif check_names and name not in schema.attribute_types and name.lower() not in conf_attributes_excluded_from_check:
raise LDAPAttributeError('invalid attribute ' + name)
else: # try standard validators
validator = find_attribute_validator(schema, name, validator)
validated = validator(value)
if validated is False:
try: # checks if the value is a byte value erroneously converted to a string (as "b'1234'"), this is a common case in Python 3 when encoding is not specified
def __init__(self, ldap_connection):
BaseStrategy.__init__(self, ldap_connection)
self.sync = True
self.no_real_dsa = False
self.pooled = False
self.can_stream = False
self.socket_size = get_config_parameter('SOCKET_SIZE')
def find_active_server(self, starting):
conf_pool_timeout = get_config_parameter('POOLING_LOOP_TIMEOUT')
counter = self.server_pool.active # can be True for "forever" or the number of cycles to try
if starting >= len(self.server_states):
starting = 0
while counter:
if log_enabled(NETWORK):
log(NETWORK, 'entering loop number <%s> for finding active server in pool <%s>', counter, self)
index = -1
pool_size = len(self.server_states)
while index < pool_size - 1:
index += 1
offset = index + starting if index + starting < pool_size else index + starting - pool_size
server_state = self.server_states[offset]
if not server_state.available: # server is offline
if (isinstance(self.server_pool.exhaust, bool) and self.server_pool.exhaust) or (datetime.now() - server_state.last_checked_time).seconds < self.server_pool.exhaust: # keeps server offline
if log_enabled(NETWORK):
def get_response(self, message_id, timeout=None, get_request=False):
"""
Get response LDAP messages
Responses are returned by the underlying connection strategy
Check if message_id LDAP message is still outstanding and wait for timeout to see if it appears in _get_response
Result is stored in connection.result
Responses without result is stored in connection.response
A tuple (responses, result) is returned
"""
conf_sleep_interval = get_config_parameter('RESPONSE_SLEEPTIME')
if timeout is None:
timeout = get_config_parameter('RESPONSE_WAITING_TIMEOUT')
response = None
result = None
request = None
if self._outstanding and message_id in self._outstanding:
while timeout >= 0: # waiting for completed message to appear in responses
responses = self._get_response(message_id)
if not responses:
sleep(conf_sleep_interval)
timeout -= conf_sleep_interval
continue
if responses == SESSION_TERMINATED_BY_SERVER:
try: # try to close the session but don't raise any error if server has already closed the session
self.close()
def to_unicode(obj, encoding=None, from_server=False):
"""Try to convert bytes (and str in python2) to unicode.
Return object unmodified if python3 string, else raise an exception
"""
conf_default_client_encoding = get_config_parameter('DEFAULT_CLIENT_ENCODING')
conf_default_server_encoding = get_config_parameter('DEFAULT_SERVER_ENCODING')
conf_additional_server_encodings = get_config_parameter('ADDITIONAL_SERVER_ENCODINGS')
conf_additional_client_encodings = get_config_parameter('ADDITIONAL_CLIENT_ENCODINGS')
if isinstance(obj, NUMERIC_TYPES):
obj = str(obj)
if isinstance(obj, (bytes, bytearray)):
if from_server: # data from server
if encoding is None:
encoding = conf_default_server_encoding
try:
return obj.decode(encoding)
except UnicodeDecodeError:
for encoding in conf_additional_server_encodings: # AD could have DN not encoded in utf-8 (even if this is not allowed by RFC4510)
try:
return obj.decode(encoding)
except UnicodeDecodeError:
pass
def _get_attributes(self, response, attr_defs, entry):
"""Assign the result of the LDAP query to the Entry object dictionary.
If the optional 'post_query' callable is present in the AttrDef it is called with each value of the attribute and the callable result is stored in the attribute.
Returns the default value for missing attributes.
If the 'dereference_dn' in AttrDef is a ObjectDef then the attribute values are treated as distinguished name and the relevant entry is retrieved and stored in the attribute value.
"""
conf_operational_attribute_prefix = get_config_parameter('ABSTRACTION_OPERATIONAL_ATTRIBUTE_PREFIX')
conf_attributes_excluded_from_object_def = [v.lower() for v in get_config_parameter('ATTRIBUTES_EXCLUDED_FROM_OBJECT_DEF')]
attributes = CaseInsensitiveWithAliasDict()
used_attribute_names = set()
for attr in attr_defs:
attr_def = attr_defs[attr]
attribute_name = None
for attr_name in response['attributes']:
if attr_def.name.lower() == attr_name.lower():
attribute_name = attr_name
break
if attribute_name or attr_def.default is not NotImplemented: # attribute value found in result or default value present - NotImplemented allows use of None as default
attribute = self.attribute_class(attr_def, entry, self)
attribute.response = response
attribute.raw_values = response['raw_attributes'][attribute_name] if attribute_name else None
if attr_def.post_query and attr_def.name in response['attributes'] and response['raw_attributes'] != list():
attribute.values = attr_def.post_query(attr_def.key, response['attributes'][attribute_name])
if p is not None]
if len(group_params) not in (0, 3):
raise ValueError(
"Incomplete LDAP groups configuration. "
"To use Ldap groups, you need to specify the three "
"parameters (groupName, groupMemberPattern and groupBase). ")
self.groupName = groupName
self.groupMemberPattern = groupMemberPattern
self.groupBase = groupBase
self.avatarPattern = avatarPattern
self.avatarData = avatarData
if accountExtraFields is None:
accountExtraFields = []
self.accountExtraFields = accountExtraFields
self.ldap_encoding = ldap3.get_config_parameter('DEFAULT_SERVER_ENCODING')