Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testFromBEROctetStringKnownValues(self):
"""BEROctetString(encoded="...") should give known result with known input"""
for st, encoded in self.knownValues:
m=MutableString(apply(s,encoded))
m.append('foo')
result = pureber.BEROctetString(encoded=m, berdecoder=pureber.BERDecoderContext())
assert m=='foo'
result = str(result)
result = map(ord, result)
assert encoded==result
def testFromBEREnumeratedKnownValues(self):
"""BEREnumerated(encoded="...") should give known result with known input"""
for integer, encoded in self.knownValues:
m=MutableString(apply(s,encoded))
m.append('foo')
result = pureber.BEREnumerated(encoded=m, berdecoder=pureber.BERDecoderContext())
assert m=='foo'
result = result.value
assert integer==result
def testSanity(self):
"""BEREnumerated(encoded=BEREnumerated(n)).value==n for -1000..1000"""
for n in range(-1000, 1001, 10):
encoded = MutableString(pureber.BEREnumerated(n))
encoded.append('foo')
result = pureber.BEREnumerated(encoded=encoded,
berdecoder=pureber.BERDecoderContext())
result = result.value
assert encoded=='foo'
assert n==result
return self.__class__.__name__ + '(' + ', '.join(l) + ')'
class LDAPFilter_extensibleMatch(LDAPMatchingRuleAssertion):
tag = CLASS_CONTEXT | 0x09
def asText(self):
return '(' + \
(self.type.value if self.type else '') + \
(':dn' if self.dnAttributes and self.dnAttributes.value else '') + \
((':' + self.matchingRule.value) if self.matchingRule else '') + \
':=' + \
self.escaper(self.matchValue.value) + \
')'
class LDAPBERDecoderContext_Filter(BERDecoderContext):
Identities = {
LDAPFilter_and.tag: LDAPFilter_and,
LDAPFilter_or.tag: LDAPFilter_or,
LDAPFilter_not.tag: LDAPFilter_not,
LDAPFilter_equalityMatch.tag: LDAPFilter_equalityMatch,
LDAPFilter_substrings.tag: LDAPFilter_substrings,
LDAPFilter_greaterOrEqual.tag: LDAPFilter_greaterOrEqual,
LDAPFilter_lessOrEqual.tag: LDAPFilter_lessOrEqual,
LDAPFilter_present.tag: LDAPFilter_present,
LDAPFilter_approxMatch.tag: LDAPFilter_approxMatch,
LDAPFilter_extensibleMatch.tag: LDAPFilter_extensibleMatch,
}
LDAP_SCOPE_baseObject = 0
LDAP_SCOPE_singleLevel = 1
LDAP_SCOPE_wholeSubtree = 2
self.data.append(BEROctetString(self.controlValue))
return BERSequence.toWire(self)
class LDAPBERDecoderContext_LDAPControls(BERDecoderContext):
Identities = {
LDAPControl.tag: LDAPControl,
}
class LDAPBERDecoderContext_LDAPMessage(BERDecoderContext):
Identities = {
LDAPControls.tag: LDAPControls,
LDAPSearchResultReference.tag: LDAPSearchResultReference,
}
class LDAPBERDecoderContext_TopLevel(BERDecoderContext):
Identities = {
BERSequence.tag: LDAPMessage,
}
class LDAPModifyRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 0x06
object = None
modification = None
@classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, berdecoder)
assert len(l) == 2
r = klass(object=l[0].value,
+ "(entry=%s, tag=%d)" \
% (repr(entry), self.tag)
class LDAPDelResponse(LDAPResult):
tag = CLASS_APPLICATION | 0x0b
pass
class LDAPModifyDNResponse_newSuperior(LDAPString):
tag = CLASS_CONTEXT | 0x00
pass
class LDAPBERDecoderContext_ModifyDNRequest(BERDecoderContext):
Identities = {
LDAPModifyDNResponse_newSuperior.tag: LDAPModifyDNResponse_newSuperior,
}
class LDAPModifyDNRequest(LDAPProtocolRequest, BERSequence):
tag = CLASS_APPLICATION | 12
entry = None
newrdn = None
deleteoldrdn = None
newSuperior = None
@classmethod
def fromBER(klass, tag, content, berdecoder=None):
l = berDecodeMultiple(content, LDAPBERDecoderContext_ModifyDNRequest(fallback=berdecoder))
auth = '*' * len(self.auth)
l = []
l.append('version=%d' % self.version)
l.append('dn=%s' % repr(repr_converter(self.dn)))
l.append('auth=%s' % repr(repr_converter(auth)))
if self.tag != self.__class__.tag:
l.append('tag=%d' % self.tag)
l.append('sasl=%s' % repr(self.sasl))
return self.__class__.__name__ + '(' + ', '.join(l) + ')'
class LDAPReferral(BERSequence):
tag = CLASS_CONTEXT | 0x03
class LDAPBERDecoderContext_LDAPSearchResultReference(BERDecoderContext):
Identities = {
BEROctetString.tag: LDAPString,
}
class LDAPSearchResultReference(LDAPProtocolResponse, BERSequence):
tag = CLASS_APPLICATION | 0x13
def __init__(self, uris=None, tag=None):
LDAPProtocolResponse.__init__(self)
BERSequence.__init__(self, value=[], tag=tag)
assert uris is not None
self.uris = uris
@classmethod
def fromBER(cls, tag, content, berdecoder=None):
import binascii
from ldaptor import ldapfilter
from ldaptor.protocols import pureber, pureldap
from ldaptor.protocols.ldap import ldaperrors
from samba import ndr
from samba.dcerpc import nbt
from samba.dcerpc import misc
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import protocol, reactor, endpoints
berdecoder = pureldap.LDAPBERDecoderContext_TopLevel(
inherit=pureldap.LDAPBERDecoderContext_LDAPMessage(
fallback=pureldap.LDAPBERDecoderContext(
fallback=pureber.BERDecoderContext()),
inherit=pureldap.LDAPBERDecoderContext(
fallback=pureber.BERDecoderContext())))
def findDomain(filt):
if isinstance(filt, pureldap.LDAPFilter_and):
for x in filt:
d = findDomain(x)
if d != None: return d
if isinstance(filt, pureldap.LDAPFilter_equalityMatch):
if (filt.attributeDesc.value == 'DnsDomain'):
return filt.assertionValue.value
return None
lastDomain = None
def buildReply(data):
global lastDomain
def __str__(self):
return 'Cannot STARTTLS while operations on wire: %r' % self.onwire
class LDAPClient(protocol.Protocol):
"""An LDAP client"""
debug = True
def __init__(self):
self.onwire = {}
self.buffer = ''
self.connected = None
berdecoder = pureldap.LDAPBERDecoderContext_TopLevel(
inherit=pureldap.LDAPBERDecoderContext_LDAPMessage(
fallback=pureldap.LDAPBERDecoderContext(fallback=pureber.BERDecoderContext()),
inherit=pureldap.LDAPBERDecoderContext(fallback=pureber.BERDecoderContext())))
def dataReceived(self, recd):
self.buffer += recd
while 1:
try:
o, bytes = pureber.berDecodeObject(self.berdecoder, self.buffer)
except pureldap.BERExceptionInsufficientData:
o, bytes = None, 0
self.buffer = self.buffer[bytes:]
if not o:
break
self.handle(o)
def connectionMade(self):
"""TCP connection has opened"""