Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testPartialLDAPModifyRequestEncodings_old(self):
"""LDAPModifyRequest(encoded="...") with too short input should throw BERExceptionInsufficientData"""
m=str(pureldap.LDAPModifyRequest(object='foo', modification=[pureldap.LDAPModification_delete(['bar'])]))
for i in xrange(len(m)):
self.assertRaises(pureber.BERExceptionInsufficientData, pureldap.LDAPModifyRequest, encoded=m[:i], berdecoder=pureber.BERDecoderContext())
def testPartialBERSequenceEncodings(self):
"""BERSequence(encoded="...") with too short input should throw BERExceptionInsufficientData"""
m=str(pureber.BERSequence([pureber.BERInteger(2)]))
assert len(m)==5
self.assertRaises(pureber.BERExceptionInsufficientData, pureber.BERSequence, encoded=m[:4], berdecoder=pureber.BERDecoderContext())
self.assertRaises(pureber.BERExceptionInsufficientData, pureber.BERSequence, encoded=m[:3], berdecoder=pureber.BERDecoderContext())
self.assertRaises(pureber.BERExceptionInsufficientData, pureber.BERSequence, encoded=m[:2], berdecoder=pureber.BERDecoderContext())
self.assertRaises(pureber.BERExceptionInsufficientData, pureber.BERSequence, encoded=m[:1], berdecoder=pureber.BERDecoderContext())
self.assertRaises(pureber.BERExceptionInsufficientData, pureber.BERSequence, encoded=MutableString(""), berdecoder=pureber.BERDecoderContext())
def testPartialLDAPModifyRequestEncodings(self):
"""LDAPModifyRequest(encoded="...") with too short input should throw BERExceptionInsufficientData"""
m=str(pureldap.LDAPModifyRequest(object='foo', modification=[pureldap.LDAPModification_delete(['bar'])]))
for i in xrange(len(m)):
self.assertRaises(pureber.BERExceptionInsufficientData, pureldap.LDAPModifyRequest, encoded=m[:i], berdecoder=pureber.BERDecoderContext())
def s(*l):
"""Join all members of list to a string. Integer members are chr()ed"""
r=''
for e in l:
if isinstance(e, types.IntType):
e=chr(e)
r=r+str(e)
return r
def l(s):
"""Split a string to ord's of chars."""
return map(lambda x: ord(x), s)
class BERBaseEquality(unittest.TestCase):
valuesToTest=(
(pureber.BERInteger, [0]),
(pureber.BERInteger, [1]),
(pureber.BERInteger, [4000]),
(pureber.BERSequence, [[pureber.BERInteger(1000), pureber.BERInteger(2000)]]),
(pureber.BERSequence, [[pureber.BERInteger(2000), pureber.BERInteger(1000)]]),
(pureber.BEROctetString, ["foo"]),
(pureber.BEROctetString, ["b��"]),
)
def testBERBaseEquality(self):
"""BER objects equal BER objects with same type and content"""
for class_, args in self.valuesToTest:
x=apply(class_, args)
y=apply(class_, args)
assert x==x
assert x==y
def testFromLDAPModifyRequestKnownValues(self):
"""LDAPModifyRequest(encoded="...") should give known result with known input"""
for args, kwargs, encoded in self.knownValues:
m=MutableString(apply(s,encoded))
m.append('foo')
result = pureldap.LDAPModifyRequest(encoded=m, berdecoder=pureber.BERDecoderContext())
assert m=='foo'
#TODO assert integer==result
def process_entry(client, args, search_filter, page_size=100, cookie=''):
basedn = args.base_dn
control_value = pureber.BERSequence([
pureber.BERInteger(page_size),
pureber.BEROctetString(cookie),
])
controls = [('1.2.840.113556.1.4.319', None, control_value)]
o = LDAPEntry(client, basedn)
results, resp_controls = yield o.search(
filterText=search_filter,
attributes=['dn'],
controls=controls,
return_controls=True)
cookie = get_paged_search_cookie(resp_controls)
defer.returnValue((results, cookie))
def dataReceived(self, recd):
self.buffer += recd
while 1:
try:
o, bytes = pureber.berDecodeObject(self.berdecoder, self.buffer)
except pureber.BERExceptionInsufficientData:
o, bytes = None, 0
self.buffer = self.buffer[bytes:]
if not o:
break
self.handle(o)
def get_paged_search_cookie(controls):
"""
Input: semi-parsed controls list from LDAP response;
list of tuples (controlType, criticality, controlValue).
Parses the controlValue and returns the cookie as a byte string.
"""
control_value = controls[0][2]
ber_context = pureber.BERDecoderContext()
ber_seq, bytes_used = pureber.berDecodeObject(ber_context, control_value)
raw_cookie = ber_seq[1]
cookie = raw_cookie.value
return cookie
def asLDAP(self):
if self._LDAP_OP is None:
raise NotImplementedError("%s.asLDAP not implemented"
% self.__class__.__name__)
tmplist = list(self)
newlist = []
for x in range(len(tmplist)):
if (isinstance(tmplist[x], six.text_type)):
value = tmplist[x].encode('utf-8')
newlist.append(value)
else:
value = tmplist[x]
newlist.append(value)
return pureber.BERSequence([
pureber.BEREnumerated(self._LDAP_OP),
pureber.BERSequence([ pureldap.LDAPAttributeDescription(self.key),
pureber.BERSet(map(pureldap.LDAPString, newlist)),
]),
sys.path.append('./ldaptor')
import binascii
from ldaptor import ldapfilter
from ldaptor.protocols import pureber, pureldap
from ldaptor.protocols.ldap import ldaperrors
from samba import ndr
from samba.dcerpc import nbt
from samba.dcerpc import misc
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import protocol, reactor, endpoints
berdecoder = pureldap.LDAPBERDecoderContext_TopLevel(
inherit=pureldap.LDAPBERDecoderContext_LDAPMessage(
fallback=pureldap.LDAPBERDecoderContext(
fallback=pureber.BERDecoderContext()),
inherit=pureldap.LDAPBERDecoderContext(
fallback=pureber.BERDecoderContext())))
def findDomain(filt):
if isinstance(filt, pureldap.LDAPFilter_and):
for x in filt:
d = findDomain(x)
if d != None: return d
if isinstance(filt, pureldap.LDAPFilter_equalityMatch):
if (filt.attributeDesc.value == 'DnsDomain'):
return filt.assertionValue.value
return None
lastDomain = None
def buildReply(data):