Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_execute_app_returns_filewrapper_prepare_returns_True(self):
from waitress.buffers import ReadOnlyFileBasedBuffer
f = io.BytesIO(b"abc")
app_iter = ReadOnlyFileBasedBuffer(f, 8192)
def app(environ, start_response):
start_response("200 OK", [("Content-Length", "3")])
return app_iter
inst = self._makeOne()
inst.channel.server.application = app
inst.execute()
self.assertTrue(inst.channel.written) # header
self.assertEqual(inst.channel.otherdata, [app_iter])
def test_execute_app_returns_filewrapper_prepare_returns_True_nocl(self):
from waitress.buffers import ReadOnlyFileBasedBuffer
f = io.BytesIO(b"abc")
app_iter = ReadOnlyFileBasedBuffer(f, 8192)
def app(environ, start_response):
start_response("200 OK", [])
return app_iter
inst = self._makeOne()
inst.channel.server.application = app
inst.execute()
self.assertTrue(inst.channel.written) # header
self.assertEqual(inst.channel.otherdata, [app_iter])
self.assertEqual(inst.content_length, 3)
def test_execute_app_returns_filewrapper_prepare_returns_True_badcl(self):
from waitress.buffers import ReadOnlyFileBasedBuffer
f = io.BytesIO(b"abc")
app_iter = ReadOnlyFileBasedBuffer(f, 8192)
def app(environ, start_response):
start_response("200 OK", [])
return app_iter
inst = self._makeOne()
inst.channel.server.application = app
inst.content_length = 10
inst.response_headers = [("Content-Length", "10")]
inst.execute()
self.assertTrue(inst.channel.written) # header
self.assertEqual(inst.channel.otherdata, [app_iter])
self.assertEqual(inst.content_length, 3)
self.assertEqual(dict(inst.response_headers)["Content-Length"], "3")
def _makeOne(self, file, block_size=8192):
from waitress.buffers import ReadOnlyFileBasedBuffer
buf = ReadOnlyFileBasedBuffer(file, block_size)
self.buffers_to_close.append(buf)
return buf
def test_write_soon_filewrapper(self):
from waitress.buffers import ReadOnlyFileBasedBuffer
f = io.BytesIO(b"abc")
wrapper = ReadOnlyFileBasedBuffer(f, 8192)
wrapper.prepare()
inst, sock, map = self._makeOneWithMap()
outbufs = inst.outbufs
orig_outbuf = outbufs[0]
wrote = inst.write_soon(wrapper)
self.assertEqual(wrote, 3)
self.assertEqual(len(outbufs), 3)
self.assertEqual(outbufs[0], orig_outbuf)
self.assertEqual(outbufs[1], wrapper)
self.assertEqual(outbufs[2].__class__.__name__, "OverflowableBuffer")
def write_soon(self, data):
if data:
# the async mainloop might be popping data off outbuf; we can
# block here waiting for it because we're in a task thread
with self.outbuf_lock:
if data.__class__ is ReadOnlyFileBasedBuffer:
# they used wsgi.file_wrapper
self.outbufs.append(data)
nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
self.outbufs.append(nextbuf)
else:
self.outbufs[-1].append(data)
# XXX We might eventually need to pull the trigger here (to
# instruct select to stop blocking), but it slows things down so
# much that I'll hold off for now; "server push" on otherwise
# unbusy systems may suffer.
return len(data)
return 0
value = value.strip()
mykey = rename_headers.get(key, None)
if mykey is None:
mykey = 'HTTP_%s' % key
if mykey not in environ:
environ[mykey] = value
# the following environment variables are required by the WSGI spec
environ['wsgi.version'] = (1, 0)
environ['wsgi.url_scheme'] = wsgi_url_scheme
environ['wsgi.errors'] = sys.stderr # apps should use the logging module
environ['wsgi.multithread'] = True
environ['wsgi.multiprocess'] = False
environ['wsgi.run_once'] = False
environ['wsgi.input'] = request.get_body_stream()
environ['wsgi.file_wrapper'] = ReadOnlyFileBasedBuffer
self.environ = environ
return environ
def write_soon(self, data):
if not self.connected:
# if the socket is closed then interrupt the task so that it
# can cleanup possibly before the app_iter is exhausted
raise ClientDisconnected
if data:
# the async mainloop might be popping data off outbuf; we can
# block here waiting for it because we're in a task thread
with self.outbuf_lock:
self._flush_outbufs_below_high_watermark()
if not self.connected:
raise ClientDisconnected
num_bytes = len(data)
if data.__class__ is ReadOnlyFileBasedBuffer:
# they used wsgi.file_wrapper
self.outbufs.append(data)
nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
self.outbufs.append(nextbuf)
self.current_outbuf_count = 0
else:
if self.current_outbuf_count > self.adj.outbuf_high_watermark:
# rotate to a new buffer if the current buffer has hit
# the watermark to avoid it growing unbounded
nextbuf = OverflowableBuffer(self.adj.outbuf_overflow)
self.outbufs.append(nextbuf)
self.current_outbuf_count = 0
self.outbufs[-1].append(data)
self.current_outbuf_count += num_bytes
self.total_outbufs_len += num_bytes
if self.total_outbufs_len >= self.adj.send_bytes:
value = value.strip()
mykey = rename_headers.get(key, None)
if mykey is None:
mykey = 'HTTP_%s' % key
if mykey not in environ:
environ[mykey] = value
# the following environment variables are required by the WSGI spec
environ['wsgi.version'] = (1, 0)
environ['wsgi.url_scheme'] = wsgi_url_scheme
environ['wsgi.errors'] = sys.stderr # apps should use the logging module
environ['wsgi.multithread'] = True
environ['wsgi.multiprocess'] = False
environ['wsgi.run_once'] = False
environ['wsgi.input'] = request.get_body_stream()
environ['wsgi.file_wrapper'] = ReadOnlyFileBasedBuffer
self.environ = environ
return environ