Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def doOpen(self, opentype):
# this is only called for top-level objects
assert len(self.protocol.receiveStack) == 1
if self.constraint:
self.constraint.checkOpentype(opentype)
for reg in self.topRegistries:
opener = reg.get(opentype)
if opener is not None:
child = opener()
break
else:
raise Violation("unknown top-level OPEN type %s" % (opentype,))
if self.constraint:
child.setConstraint(self.constraint)
return child
def checkObject(self, obj, inbound):
if not isinstance(obj, six.text_type):
raise Violation("not a unicode object")
if self.maxLength != None and len(obj) > self.maxLength:
raise Violation("string too long (%d > %d)" %
(len(obj), self.maxLength))
if len(obj) < self.minLength:
raise Violation("string too short (%d < %d)" %
(len(obj), self.minLength))
if self.regexp:
if not self.regexp.search(obj):
raise Violation("regexp failed to match")
def checkToken(self, typebyte, size):
if self.maxKeys is not None and len(self.d) >= self.maxKeys:
raise Violation("the table is full")
if self.key is None:
if typebyte != INT:
raise BananaError("VocabUnslicer only accepts INT keys")
else:
if typebyte != STRING:
raise BananaError("VocabUnslicer only accepts STRING values")
if self.valueConstraint:
self.valueConstraint.checkToken(typebyte, size)
def getRequest(self, reqID):
# invoked by AnswerUnslicer and ErrorUnslicer
try:
return self.waitingForAnswers[reqID]
except KeyError:
raise Violation("non-existent reqID '%d'" % reqID)
def checkAllArgs(self, args, kwargs, inbound):
# first we map the positional arguments
allargs = {}
if len(args) > len(self.argumentNames):
raise Violation("method takes %d positional arguments (%d given)"
% (len(self.argumentNames), len(args)))
for i,argvalue in enumerate(args):
allargs[self.argumentNames[i]] = argvalue
for argname,argvalue in kwargs.items():
if argname in allargs:
raise Violation("got multiple values for keyword argument '%s'"
% (argname,))
allargs[argname] = argvalue
for argname, argvalue in allargs.items():
accept, constraint = self.getKeywordArgConstraint(argname)
if not accept:
# this argument will be ignored by the far end. TODO: emit a
# warning
pass
try:
def checkObject(self, obj, inbound):
ok = False
for c in self.alternatives:
try:
c.checkObject(obj, inbound)
ok = True
except Violation:
pass
if not ok:
raise Violation("object type %s does not satisfy any of %s"
% (type(obj), self.alternatives))
def checkToken(self, typebyte, size):
if self.maxKeys != None:
if len(self.d) >= self.maxKeys:
raise Violation("the dict is full")
if self.gettingKey:
if self.keyConstraint:
self.keyConstraint.checkToken(typebyte, size)
else:
if self.valueConstraint:
self.valueConstraint.checkToken(typebyte, size)
def checkObject(self, obj, inbound):
if not isinstance(obj, (set, frozenset)):
raise Violation("not a set")
if (self.mutable == True and
not isinstance(obj, set)):
raise Violation("obj is a set, but not a mutable one")
if (self.mutable == False and
not isinstance(obj, frozenset)):
raise Violation("obj is a set, but not an immutable one")
if self.maxLength is not None and len(obj) > self.maxLength:
raise Violation("set is too large")
if self.constraint:
for o in obj:
self.constraint.checkObject(o, inbound)
def openerCheckToken(self, typebyte, size, opentype):
if typebyte == tokens.STRING:
if size > self.maxIndexLength:
why = "STRING token is too long, %d>%d" % \
(size, self.maxIndexLength)
raise Violation(why)
elif typebyte == tokens.VOCAB:
return
else:
# TODO: hack for testing
raise Violation("index token 0x%02x not STRING or VOCAB" % \
ord(typebyte))
raise BananaError("index token 0x%02x not STRING or VOCAB" % \
ord(typebyte))
raise Violation(why)
if opentype == ("copyable",):
# TODO: this is silly, of course (should pre-compute maxlen)
maxlen = reduce(max,
[len(cname) \
for cname in list(copyable.CopyableRegistry.keys())]
)
if size > maxlen:
why = "copyable-classname token is too long, %d>%d" % \
(size, maxlen)
raise Violation(why)
elif typebyte == tokens.VOCAB:
return
else:
# TODO: hack for testing
raise Violation("index token 0x%02x not STRING or VOCAB" % \
six.byte2int(typebyte))
raise BananaError("index token 0x%02x not STRING or VOCAB" % \
six.byte2int(typebyte))