Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def opt_service_location(self, value):
"""Service location, in the form BASEDN:HOST[:PORT]"""
if not self.opts.has_key('service-location'):
self.opts['service-location']={}
base, location = value.split(':', 1)
try:
dn = distinguishedname.DistinguishedName(base)
except distinguishedname.InvalidRelativeDistinguishedName, e:
raise usage.UsageError, str(e)
if not location:
raise usage.UsageError, "service-location must specify host"
if ':' in location:
host, port = location.split(':', 1)
else:
host, port = location, None
if not host:
host = None
if not port:
port = None
def __init__(self,
baseDN,
serviceLocationOverrides=None):
self.baseDN = distinguishedname.DistinguishedName(baseDN)
self.serviceLocationOverrides = {}
if serviceLocationOverrides is not None:
for k,v in serviceLocationOverrides.items():
dn = distinguishedname.DistinguishedName(k)
self.serviceLocationOverrides[dn]=v
def _gotUPNResult(results):
if len(results) != 1:
# Not exactly one result, so this might not be an UNP.
return distinguishedname.DistinguishedName(request.dn)
# A single result, so the UPN might exist.
return results[0].dn
if '@' in request.dn and ',' not in request.dn:
# This might be an UPN request.
filterText = b'(' + self._loginAttribute + b'=' + request.dn + b')'
d = root.search(filterText=filterText)
d.addCallback(_gotUPNResult)
else:
d = defer.succeed(distinguishedname.DistinguishedName(request.dn))
# Once the BIND DN is known, search for the LDAP entry.
d.addCallback(lambda dn: root.lookup(dn))
def _noEntry(fail):
"""
Called when the requested BIND DN was not found.
"""
fail.trap(ldaperrors.LDAPNoSuchObject)
return None
d.addErrback(_noEntry)
def _gotEntry(entry, auth):
"""
Called when the requested BIND DN was found.
"""
def __init__(self,
baseDN,
serviceLocationOverrides=None):
self.baseDN = distinguishedname.DistinguishedName(baseDN)
self.serviceLocationOverrides = {}
if serviceLocationOverrides is not None:
for k,v in serviceLocationOverrides.items():
dn = distinguishedname.DistinguishedName(k)
self.serviceLocationOverrides[dn]=v
def handle_LDAPBindRequest(self, request, controls, reply):
if request.version != 3:
raise ldaperrors.LDAPProtocolError(
'Version %u not supported' % request.version)
self.checkControls(controls)
if request.dn == b'':
# anonymous bind
self.boundUser = None
return pureldap.LDAPBindResponse(resultCode=0)
else:
dn = distinguishedname.DistinguishedName(request.dn)
root = interfaces.IConnectedLDAPEntry(self.factory)
d = root.lookup(dn)
def _noEntry(fail):
fail.trap(ldaperrors.LDAPNoSuchObject)
return None
d.addErrback(_noEntry)
def _gotEntry(entry, auth):
if entry is None:
raise ldaperrors.LDAPInvalidCredentials()
d = entry.bind(auth)
def _cb(entry):
self.boundUser = entry
def coerce(self, *a, **kw):
val = super(LDAPDN, self).coerce(*a, **kw)
try:
dn = distinguishedname.DistinguishedName(stringValue=val)
except distinguishedname.InvalidRelativeDistinguishedName, e:
raise annotate.InputError, \
"%r is not a valid LDAP DN: %s" % (val, e)
return dn
def add(self, context, **kw):
cfg = context.locate(interfaces.ILDAPConfig)
dnAttr = self._getDNAttr()
assert kw.has_key('add_'+dnAttr), 'Must have attribute dn %s points to.' % dnAttr
assert kw['add_'+dnAttr], 'Attribute %s must have value.' % 'add_'+dnAttr
# TODO ugly
rdn=distinguishedname.RelativeDistinguishedName(
attributeTypesAndValues=[
distinguishedname.LDAPAttributeTypeAndValue(attributeType=dnAttr,
value=kw['add_'+dnAttr]),
])
#TODO verify
changes = []
for k,v in kw.items():
if hasattr(self, "nonUserEditableAttributeType_"+k):
raise "Can't set attribute %s when adding." % k
elif k[:len("add_")]=="add_":
if not v:
continue
attrtype = self._get_attrtype(k[len("add_"):])
assert attrtype
def childFactory(self, ctx, name):
unquoted=uriUnquote(name)
try:
dn = distinguishedname.DistinguishedName(stringValue=unquoted)
except distinguishedname.InvalidRelativeDistinguishedName, e:
# TODO There's no way to throw a FormException at this stage.
return None
r=ConfirmChange(dn=dn)
return r
def coerce(self, *a, **kw):
val = super(LDAPDN, self).coerce(*a, **kw)
try:
dn = distinguishedname.DistinguishedName(stringValue=val)
except distinguishedname.InvalidRelativeDistinguishedName, e:
raise annotate.InputError, \
"%r is not a valid LDAP DN: %s" % (val, e)
return dn
def __init__(self,
baseDN,
serviceLocationOverrides=None):
self.baseDN = distinguishedname.DistinguishedName(baseDN)
self.serviceLocationOverrides = {}
if serviceLocationOverrides is not None:
for k,v in serviceLocationOverrides.items():
dn = distinguishedname.DistinguishedName(k)
self.serviceLocationOverrides[dn]=v