Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.trailers = {}
try:
for line in self.fp.read_trailer_lines():
if line[0] in b' \t':
# It's a continuation line.
v = line.strip()
else:
try:
k, v = line.split(b':', 1)
except ValueError:
raise ValueError('Illegal header line.')
k = k.strip().title()
v = v.strip()
if k in cheroot.server.comma_separated_headers:
existing = self.trailers.get(k)
if existing:
v = b', '.join((existing, v))
self.trailers[k] = v
except Exception:
e = sys.exc_info()[1]
if e.__class__.__name__ == 'MaxSizeExceeded':
# Post data is too big
raise cherrypy.HTTPError(
413, 'Maximum request length: %r' % e.args[1])
else:
raise
"""
WSGI server interface (see PEP 333).
This adds some CP-specific bits to the framework-agnostic cheroot package.
"""
import sys
import cheroot.wsgi
import cheroot.server
import cherrypy
class CPWSGIHTTPRequest(cheroot.server.HTTPRequest):
"""Wrapper for cheroot.server.HTTPRequest.
This is a layer, which preserves URI parsing mode like it which was
before Cheroot v5.8.0.
"""
def __init__(self, server, conn):
"""Initialize HTTP request container instance.
Args:
server (cheroot.server.HTTPServer):
web server object receiving this request
conn (cheroot.server.HTTPConnection):
HTTP connection object for this request
"""
super(CPWSGIHTTPRequest, self).__init__(
"""
WSGI server interface (see PEP 333).
This adds some CP-specific bits to the framework-agnostic cheroot package.
"""
import sys
import cheroot.wsgi
import cheroot.server
import cherrypy
class CPWSGIHTTPRequest(cheroot.server.HTTPRequest):
"""Wrapper for cheroot.server.HTTPRequest.
This is a layer, which preserves URI parsing mode like it which was
before Cheroot v5.8.0.
"""
def __init__(self, server, conn):
"""Initialize HTTP request container instance.
Args:
server (cheroot.server.HTTPServer):
web server object receiving this request
conn (cheroot.server.HTTPConnection):
HTTP connection object for this request
"""
super(CPWSGIHTTPRequest, self).__init__(
def makefile(self, sock, mode='r', bufsize=-1):
"""Return socket file object."""
cls = (
SSLFileobjectStreamReader
if 'r' in mode else
SSLFileobjectStreamWriter
)
if SSL and isinstance(sock, ssl_conn_type):
wrapped_socket = cls(sock, mode, bufsize)
wrapped_socket.ssl_timeout = sock.gettimeout()
return wrapped_socket
# This is from past:
# TODO: figure out what it's meant for
else:
return cheroot_server.CP_fileobject(sock, mode, bufsize)
self.nodelay = self.server_adapter.nodelay
if sys.version_info >= (3, 0):
ssl_module = self.server_adapter.ssl_module or 'builtin'
else:
ssl_module = self.server_adapter.ssl_module or 'pyopenssl'
if self.server_adapter.ssl_context:
adapter_class = cheroot.server.get_ssl_adapter_class(ssl_module)
self.ssl_adapter = adapter_class(
self.server_adapter.ssl_certificate,
self.server_adapter.ssl_private_key,
self.server_adapter.ssl_certificate_chain,
self.server_adapter.ssl_ciphers)
self.ssl_adapter.context = self.server_adapter.ssl_context
elif self.server_adapter.ssl_certificate:
adapter_class = cheroot.server.get_ssl_adapter_class(ssl_module)
self.ssl_adapter = adapter_class(
self.server_adapter.ssl_certificate,
self.server_adapter.ssl_private_key,
self.server_adapter.ssl_certificate_chain,
self.server_adapter.ssl_ciphers)
self.stats['Enabled'] = getattr(
self.server_adapter, 'statistics', False)