Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"mail": ["fake@nickwhyte.com"],
"website": ["http://www.nickwhyte.com"],
"sn": ["User"],
"givenname": ["Fake"],
"objectclass": ["person"],
"dn": "cn=Fake User,ou=users,dc=mydomain,dc=com",
"password": "fake321",
},
ldap3.utils.conv.escape_filter_chars("cn=Jane (admin)"): {
"cn": ["Jane Citizen"],
"mail": ["jane@jane.com"],
"website": ["http://www.janecitizen.com"],
"sn": ["Citizen"],
"givenname": ["Jane"],
"objectclass": ["person"],
"dn": ldap3.utils.conv.escape_filter_chars(
"cn=Jane (admin),ou=users,dc=mydomain,dc=com"
),
"password": "fake123",
},
},
"ou=groups": {
"cn=Staff": {
"cn": ["Staff"],
"description": ["A Group for staff"],
"uniqueMember": [
"cn=Nick Whyte,ou=users,dc=mydomain,dc=com",
"cn=Fake User,ou=users,dc=mydomain,dc=com",
],
"objectclass": ["group"],
"dn": "cn=Staff,ou=groups,dc=mydomain,dc=com",
},
def user_attrs(uid, connection=ldap.ldap_ocf, base=OCF_LDAP_PEOPLE):
"""Returns a dictionary of LDAP attributes for a given LDAP UID.
The returned dictionary looks like:
{
'uid': ['somebody'],
'objectClass': ['ocfAccount', 'account', 'posixAccount'],
'loginShell': ['/bin/zsh']
}
Returns None if no account exists with uid=user_account.
"""
with connection() as c:
c.search(
base,
'(uid={})'.format(escape_filter_chars(uid)),
attributes=ldap3.ALL_ATTRIBUTES
)
if len(c.response) > 0:
return c.response[0]['attributes']
def getUserInfo(self, domainDumper, samname):
entries = self.client.search(domainDumper.root, '(sAMAccountName=%s)' % escape_filter_chars(samname), attributes=['objectSid'])
try:
dn = self.client.entries[0].entry_dn
sid = self.client.entries[0]['objectSid']
return (dn, sid)
except IndexError:
LOG.error('User not found in LDAP: %s' % samname)
return False
def chown_dir_and_children(path_to_dir, username):
"""
Changes the owner of a given directory, and its children to the given username;
Also changes the group of the given directory, and its children to 'member'.
"""
logger.info(
f"changing owner and group of directory {path_to_dir} and children",
)
ldap_server = ldap3.Server(config.LDAP_HOST, get_info=ldap3.ALL)
with ldap3.Connection(ldap_server, auto_bind=True, **config.LDAP_AUTH) as conn:
username = ldap3.utils.conv.escape_filter_chars(username)
success = conn.search(
search_base="dc=netsoc,dc=co",
search_filter=f"(&(objectClass=account)(uid={username}))",
attributes=["uidNumber", "gidNumber"],
)
if not success or len(conn.entries) != 1:
raise Exception("user not found")
uidNumber = conn.entries[0]["uidNumber"].value
gidNumber = conn.entries[0]["gidNumber"].value
split_command = ["chown", "-R", f"{uidNumber}:{gidNumber}", path_to_dir]
subprocess.call(split_command, stdout=subprocess.PIPE)
def validatePrivileges(self, uname, domainDumper):
# Find the user's DN
membersids = []
sidmapping = {}
privs = {
'create': False, # Whether we can create users
'createIn': None, # Where we can create users
'escalateViaGroup': False, # Whether we can escalate via a group
'escalateGroup': None, # The group we can escalate via
'aclEscalate': False, # Whether we can escalate via ACL on the domain object
'aclEscalateIn': None # The object which ACL we can edit
}
self.client.search(domainDumper.root, '(sAMAccountName=%s)' % escape_filter_chars(uname), attributes=['objectSid', 'primaryGroupId'])
user = self.client.entries[0]
usersid = user['objectSid'].value
sidmapping[usersid] = user.entry_dn
membersids.append(usersid)
# The groups the user is a member of
self.client.search(domainDumper.root, '(member:1.2.840.113556.1.4.1941:=%s)' % escape_filter_chars(user.entry_dn), attributes=['name', 'objectSid'])
LOG.debug('User is a member of: %s' % self.client.entries)
for entry in self.client.entries:
sidmapping[entry['objectSid'].value] = entry.entry_dn
membersids.append(entry['objectSid'].value)
# Also search by primarygroupid
# First get domain SID
self.client.search(domainDumper.root, '(objectClass=domain)', attributes=['objectSid'])
domainsid = self.client.entries[0]['objectSid'].value
gid = user['primaryGroupId'].value
# Now search for this group by SID
def get_dn(self, sam_name):
if "," in sam_name:
return sam_name
try:
self.client.search(self.domain_dumper.root, '(sAMAccountName=%s)' % escape_filter_chars(sam_name), attributes=['objectSid'])
return self.client.entries[0].entry_dn
except IndexError:
return None
def getUserInfo(self, domainDumper, samname):
entries = self.client.search(domainDumper.root, '(sAMAccountName=%s)' % escape_filter_chars(samname), attributes=['objectSid'])
try:
dn = self.client.entries[0].entry_dn
sid = self.client.entries[0]['objectSid']
return (dn, sid)
except IndexError:
LOG.error('User not found in LDAP: %s' % samname)
return False
sidmapping = {}
privs = {
'create': False, # Whether we can create users
'createIn': None, # Where we can create users
'escalateViaGroup': False, # Whether we can escalate via a group
'escalateGroup': None, # The group we can escalate via
'aclEscalate': False, # Whether we can escalate via ACL on the domain object
'aclEscalateIn': None # The object which ACL we can edit
}
self.client.search(domainDumper.root, '(sAMAccountName=%s)' % escape_filter_chars(uname), attributes=['objectSid', 'primaryGroupId'])
user = self.client.entries[0]
usersid = user['objectSid'].value
sidmapping[usersid] = user.entry_dn
membersids.append(usersid)
# The groups the user is a member of
self.client.search(domainDumper.root, '(member:1.2.840.113556.1.4.1941:=%s)' % escape_filter_chars(user.entry_dn), attributes=['name', 'objectSid'])
LOG.debug('User is a member of: %s' % self.client.entries)
for entry in self.client.entries:
sidmapping[entry['objectSid'].value] = entry.entry_dn
membersids.append(entry['objectSid'].value)
# Also search by primarygroupid
# First get domain SID
self.client.search(domainDumper.root, '(objectClass=domain)', attributes=['objectSid'])
domainsid = self.client.entries[0]['objectSid'].value
gid = user['primaryGroupId'].value
# Now search for this group by SID
self.client.search(domainDumper.root, '(objectSid=%s-%d)' % (domainsid, gid), attributes=['name', 'objectSid', 'distinguishedName'])
group = self.client.entries[0]
LOG.debug('User is a member of: %s' % self.client.entries)
# Add the group sid of the primary group to the list
sidmapping[group['objectSid'].value] = group.entry_dn
membersids.append(group['objectSid'].value)
def getUserInfo(self, domainDumper, samname):
entries = self.client.search(domainDumper.root, '(sAMAccountName=%s)' % escape_filter_chars(samname), attributes=['objectSid'])
try:
dn = self.client.entries[0].entry_dn
sid = self.client.entries[0]['objectSid']
return (dn, sid)
except IndexError:
LOG.error('User not found in LDAP: %s' % samname)
return False