Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def assertSent(self, *shouldBeSent):
shouldBeSent = list(shouldBeSent)
msg = '%s expected to send %r but sent %r' % (
self.__class__.__name__,
shouldBeSent,
self.sent)
assert self.sent == shouldBeSent, msg
sentStr = b''.join([to_bytes(x) for x in self.sent])
shouldBeSentStr = b''.join([to_bytes(x) for x in shouldBeSent])
msg = '%s expected to send data %r but sent %r' % (
self.__class__.__name__,
shouldBeSentStr,
sentStr)
assert sentStr == shouldBeSentStr, msg
def assertSent(self, *shouldBeSent):
shouldBeSent = list(shouldBeSent)
msg = '%s expected to send %r but sent %r' % (
self.__class__.__name__,
shouldBeSent,
self.sent)
assert self.sent == shouldBeSent, msg
sentStr = b''.join([to_bytes(x) for x in self.sent])
shouldBeSentStr = b''.join([to_bytes(x) for x in shouldBeSent])
msg = '%s expected to send data %r but sent %r' % (
self.__class__.__name__,
shouldBeSentStr,
sentStr)
assert sentStr == shouldBeSentStr, msg
def toWire(self):
if self.message:
return b'%s: %s' % (self.name, to_bytes(self.message))
if self.name:
return self.name
return b'Unknown LDAP error %r' % self
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_ModifyDNRequest(fallback=berdecoder))
kw = {}
try:
kw['newSuperior'] = to_bytes(l[3].value)
except IndexError:
pass
r = klass(entry=to_bytes(l[0].value),
newrdn=to_bytes(l[1].value),
deleteoldrdn=l[2].value,
tag=tag,
**kw)
return r
def attributeAsLDIF(attribute, value):
attribute = to_bytes(attribute)
value = to_bytes(value)
if value.startswith(b'\0') \
or value.startswith(b'\n') \
or value.startswith(b'\r') \
or value.startswith(b' ') \
or value.startswith(b':') \
or value.startswith(b'<') \
or value.endswith(b' ') \
or containsNonprintable(value):
return attributeAsLDIF_base64(attribute, value)
else:
return b'%s: %s\n' % (attribute, value)
def attributeAsLDIF(attribute, value):
attribute = to_bytes(attribute)
value = to_bytes(value)
if value.startswith(b'\0') \
or value.startswith(b'\n') \
or value.startswith(b'\r') \
or value.startswith(b' ') \
or value.startswith(b':') \
or value.startswith(b'<') \
or value.endswith(b' ') \
or containsNonprintable(value):
return attributeAsLDIF_base64(attribute, value)
else:
return b'%s: %s\n' % (attribute, value)
l.sort(key=lambda x: to_bytes(x[0]))
for key, values in l:
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_ModifyDNRequest(fallback=berdecoder))
kw = {}
try:
kw['newSuperior'] = to_bytes(l[3].value)
except IndexError:
pass
r = klass(entry=to_bytes(l[0].value),
newrdn=to_bytes(l[1].value),
deleteoldrdn=l[2].value,
tag=tag,
**kw)
return r
def handleSearchResults(l):
if len(l) == 0:
raise ldaperrors.LDAPOther("No such DN")
elif len(l) == 1:
o = l[0]
attributeTypes = []
objectClasses = []
for text in o.get("attributeTypes", []):
attributeTypes.append(schema.AttributeTypeDescription(to_bytes(text)))
for text in o.get("objectClasses", []):
objectClasses.append(schema.ObjectClassDescription(to_bytes(text)))
assert attributeTypes, "LDAP server doesn't give attributeTypes for subschemaSubentry dn=%s" % o.dn
return (attributeTypes, objectClasses)
else:
raise ldaperrors.LDAPOther("DN matched multiple entries")
@param other: An LDAPEntry to compare to.
@return: None if equal, otherwise a ModifyOp that would make
this entry look like other.
"""
assert self.dn == other.dn
if self == other:
return None
r = []
myKeys = set(self.keys())
otherKeys = set(other.keys())
addedKeys = list(otherKeys - myKeys)
addedKeys.sort(key=to_bytes) # for reproducability only
for added in addedKeys:
r.append(delta.Add(added, other[added]))
deletedKeys = list(myKeys - otherKeys)
deletedKeys.sort(key=to_bytes) # for reproducability only
for deleted in deletedKeys:
r.append(delta.Delete(deleted, self[deleted]))
sharedKeys = list(myKeys & otherKeys)
sharedKeys.sort(key=to_bytes) # for reproducability only
for shared in sharedKeys:
addedValues = list(other[shared] - self[shared])
if addedValues:
addedValues.sort(key=to_bytes) # for reproducability only
r.append(delta.Add(shared, addedValues))