Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
else:
raise ValueError("Unexpected return value: %r" % (result,))
# Unfortunately Klein imposes this strange requirement that the request object
# be adaptable to KleinRequest. Klein only registers an adapter from
# twisted.web.server.Request - despite the fact that the adapter doesn't
# actually use the adaptee for anything.
#
# Here, register an adapter from the dummy request type so that tests can
# exercise Klein-based code without trying to use the real request type.
#
# See https://github.com/twisted/klein/issues/31
from twisted.python.components import registerAdapter
from klein.app import KleinRequest
from klein.interfaces import IKleinRequest
registerAdapter(KleinRequest, _DummyRequest, IKleinRequest)
def build_schema_test(name, schema, schema_store,
failing_instances, passing_instances):
"""
Create test case verifying that various instances pass and fail
verification with a given JSON Schema.
:param bytes name: Name of test case to create.
:param dict schema: Schema to test.
:param dict schema_store: The schema definitions.
:param list failing_instances: Instances which should fail validation.
:param list passing_instances: Instances which should pass validation.
:returns: The test case; a ``SynchronousTestCase`` subclass.
"""
def foo(request, bar):
krequest = IKleinRequest(request)
relative_url[0] = krequest.url_for('foo', {'bar': bar + 1},
force_external=True)
def foo(request, bar):
krequest = IKleinRequest(request)
relative_url[0] = krequest.url_for('foo', {'bar': bar + 1})
def branch_f(instance, request, *a, **kw):
IKleinRequest(request).branch_segments = kw.pop('__rest__', '').split('/')
return _call(instance, f, request, *a, **kw)
def branch_f(instance, request, *a, **kw):
IKleinRequest(request).branch_segments = (
kw.pop('__rest__', '').split('/')
)
return _call(instance, f, request, *a, **kw)
result = ensureDeferred(result)
return result
@implementer(IKleinRequest)
class KleinRequest(object):
def __init__(self, request):
self.branch_segments = ['']
self.mapper = None
def url_for(self, *args, **kwargs):
return self.mapper.build(*args, **kwargs)
registerAdapter(KleinRequest, Request, IKleinRequest)
class Klein(object):
"""
L{Klein} is an object which is responsible for maintaining the routing
configuration of our application.
@ivar _url_map: A C{werkzeug.routing.Map} object which will be used for
routing resolution.
@ivar _endpoints: A C{dict} mapping endpoint names to handler functions.
"""
_bound_klein_instances = weakref.WeakKeyDictionary()
_subroute_segments = 0
return f(instance, *args, **kwargs)
@implementer(IKleinRequest)
class KleinRequest(object):
def __init__(self, request):
self.branch_segments = ['']
self.mapper = None
def url_for(self, *args, **kwargs):
return self.mapper.build(*args, **kwargs)
registerAdapter(KleinRequest, Request, IKleinRequest)
class Klein(object):
"""
L{Klein} is an object which is responsible for maintaining the routing
configuration of our application.
@ivar _url_map: A C{werkzeug.routing.Map} object which will be used for
routing resolution.
@ivar _endpoints: A C{dict} mapping endpoint names to handler functions.
"""
_bound_klein_instances = weakref.WeakKeyDictionary()
def __init__(self):
self._url_map = Map()
def url_for(request, endpoint, *args, **kwargs):
"""
Compute the URL for an endpoint.
"""
kwargs["force_external"] = True
return IKleinRequest(request).url_for(endpoint, *args, **kwargs)
from zope.interface import implementer
from klein.resource import KleinResource
from klein.interfaces import IKleinRequest
__all__ = ['Klein', 'run', 'route', 'resource']
def _call(instance, f, *args, **kwargs):
if instance is None:
return f(*args, **kwargs)
return f(instance, *args, **kwargs)
@implementer(IKleinRequest)
class KleinRequest(object):
def __init__(self, request):
self.branch_segments = ['']
self.mapper = None
def url_for(self, *args, **kwargs):
return self.mapper.build(*args, **kwargs)
registerAdapter(KleinRequest, Request, IKleinRequest)
class Klein(object):
"""
L{Klein} is an object which is responsible for maintaining the routing
"run",
"route",
"resource",
)
def _call(instance, f, *args, **kwargs):
if instance is not None or getattr(f, "__klein_bound__", False):
args = (instance,) + args
result = f(*args, **kwargs)
if iscoroutine(result):
result = ensureDeferred(result)
return result
@implementer(IKleinRequest)
class KleinRequest(object):
def __init__(self, request):
self.branch_segments = ['']
self.mapper = None
def url_for(self, *args, **kwargs):
return self.mapper.build(*args, **kwargs)
registerAdapter(KleinRequest, Request, IKleinRequest)
class Klein(object):
"""
L{Klein} is an object which is responsible for maintaining the routing