Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testPartialBERSequenceEncodings(self):
"""BERSequence(encoded="...") with too short input should throw BERExceptionInsufficientData"""
m=str(pureber.BERSequence([pureber.BERInteger(2)]))
assert len(m)==5
self.assertRaises(pureber.BERExceptionInsufficientData, pureber.BERSequence, encoded=m[:4], berdecoder=pureber.BERDecoderContext())
self.assertRaises(pureber.BERExceptionInsufficientData, pureber.BERSequence, encoded=m[:3], berdecoder=pureber.BERDecoderContext())
self.assertRaises(pureber.BERExceptionInsufficientData, pureber.BERSequence, encoded=m[:2], berdecoder=pureber.BERDecoderContext())
self.assertRaises(pureber.BERExceptionInsufficientData, pureber.BERSequence, encoded=m[:1], berdecoder=pureber.BERDecoderContext())
self.assertRaises(pureber.BERExceptionInsufficientData, pureber.BERSequence, encoded=MutableString(""), berdecoder=pureber.BERDecoderContext())
def __init__(self, value=None, controls=None, id=None, tag=None):
BERSequence.__init__(self, value=[], tag=tag)
assert value is not None
self.id = id
if self.id is None:
self.id = alloc_ldap_message_id()
self.value = value
self.controls = controls
def toWire(self):
return BERSequence([
LDAPString(self.entry),
BERSequence(map(BERSequence, self.attributes)),
], tag=self.tag).toWire()
attributes = [(repr_converter(key), [repr_converter(v) for v in value]) for (key, value) in self.attributes]
return '{}(objectName={}, attributes={}{})'.format(
self.__class__.__name__,
repr(name),
repr(attributes),
', tag={}'.format(self.tag) if self.tag != self.__class__.tag else '',
)
class LDAPSearchResultDone(LDAPResult):
tag = CLASS_APPLICATION | 0x05
pass
class LDAPControls(BERSequence):
tag = CLASS_CONTEXT | 0x00
@classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_LDAPControls(
inherit=berdecoder))
r = klass(l, tag=tag)
return r
class LDAPControl(BERSequence):
criticality = None
controlValue = None
@classmethod
def process_entry(client, args, search_filter, page_size=100, cookie=''):
basedn = args.base_dn
control_value = pureber.BERSequence([
pureber.BERInteger(page_size),
pureber.BEROctetString(cookie),
])
controls = [('1.2.840.113556.1.4.319', None, control_value)]
o = LDAPEntry(client, basedn)
results, resp_controls = yield o.search(
filterText=search_filter,
attributes=['dn'],
controls=controls,
return_controls=True)
cookie = get_paged_search_cookie(resp_controls)
defer.returnValue((results, cookie))
def toWire(self):
return BERSequence([
LDAPString(self.type),
BERSequence(self.substrings)], tag=self.tag).toWire()
def extendedRequest_LDAPPasswordModifyRequest(self, data, reply):
if not isinstance(data, pureber.BERSequence):
raise ldaperrors.LDAPProtocolError(
'Extended request PasswordModify expected a BERSequence.')
userIdentity = None
oldPasswd = None
newPasswd = None
for value in data:
if isinstance(
value,
pureldap.LDAPPasswordModifyRequest_userIdentity):
if userIdentity is not None:
raise ldaperrors.LDAPProtocolError(
'Extended request '
'PasswordModify received userIdentity twice.')
userIdentity = value.value
def __repr__(self):
name = repr_converter(self.object)
if self.tag==self.__class__.tag:
return self.__class__.__name__+"(object=%s, modification=%s)"\
%(repr(name), repr(self.modification))
else:
return self.__class__.__name__+"(object=%s, modification=%s, tag=%d)" \
%(repr(name), repr(self.modification), self.tag)
class LDAPModifyResponse(LDAPResult):
tag = CLASS_APPLICATION | 0x07
class LDAPAddRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 0x08
@classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
r = klass(entry=l[0].value,
attributes=l[1],
tag=tag)
return r
def __init__(self, entry=None, attributes=None, tag=None):
"""
Initialize the object
Example usage::
LDAPFilter_extensibleMatch.tag: LDAPFilter_extensibleMatch,
}
LDAP_SCOPE_baseObject = 0
LDAP_SCOPE_singleLevel = 1
LDAP_SCOPE_wholeSubtree = 2
LDAP_DEREF_neverDerefAliases = 0
LDAP_DEREF_derefInSearching = 1
LDAP_DEREF_derefFindingBaseObj = 2
LDAP_DEREF_derefAlways = 3
LDAPFilterMatchAll = LDAPFilter_present('objectClass')
class LDAPSearchRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 0x03
baseObject = ''
scope = LDAP_SCOPE_wholeSubtree
derefAliases = LDAP_DEREF_neverDerefAliases
sizeLimit = 0
timeLimit = 0
typesOnly = 0
filter = LDAPFilterMatchAll
attributes = [] # TODO AttributeDescriptionList
# TODO decode
@classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_Filter(fallback=berdecoder, inherit=berdecoder))