Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import cheroot.server
from cheroot.test import webtest
import cheroot.wsgi
EPHEMERAL_PORT = 0
NO_INTERFACE = None # Using this or '' will cause an exception
ANY_INTERFACE_IPV4 = '0.0.0.0'
ANY_INTERFACE_IPV6 = '::'
config = {
cheroot.wsgi.Server: {
'bind_addr': (NO_INTERFACE, EPHEMERAL_PORT),
'wsgi_app': None,
},
cheroot.server.HTTPServer: {
'bind_addr': (NO_INTERFACE, EPHEMERAL_PORT),
'gateway': cheroot.server.Gateway,
},
}
def cheroot_server(server_factory):
"""Set up and tear down a Cheroot server instance."""
conf = config[server_factory].copy()
bind_port = conf.pop('bind_addr')[-1]
for interface in ANY_INTERFACE_IPV6, ANY_INTERFACE_IPV4:
try:
actual_bind_addr = (interface, bind_port)
httpserver = server_factory( # create it
bind_addr=actual_bind_addr,
import cheroot.server
from cheroot.test import webtest
import cheroot.wsgi
EPHEMERAL_PORT = 0
NO_INTERFACE = None # Using this or '' will cause an exception
ANY_INTERFACE_IPV4 = '0.0.0.0'
ANY_INTERFACE_IPV6 = '::'
config = {
cheroot.wsgi.Server: {
'bind_addr': (NO_INTERFACE, EPHEMERAL_PORT),
'wsgi_app': None,
},
cheroot.server.HTTPServer: {
'bind_addr': (NO_INTERFACE, EPHEMERAL_PORT),
'gateway': cheroot.server.Gateway,
},
}
def cheroot_server(server_factory):
"""Set up and tear down a Cheroot server instance."""
conf = config[server_factory].copy()
bind_port = conf.pop('bind_addr')[-1]
for interface in ANY_INTERFACE_IPV6, ANY_INTERFACE_IPV4:
try:
actual_bind_addr = (interface, bind_port)
httpserver = server_factory( # create it
bind_addr=actual_bind_addr,
def native_server():
"""Set up and tear down a Cheroot HTTP server instance."""
for srv in cheroot_server(cheroot.server.HTTPServer):
yield srv
def server(self, parsed_args):
"""Server."""
server_args = vars(self)
server_args['bind_addr'] = parsed_args['bind_addr']
if parsed_args.max is not None:
server_args['maxthreads'] = parsed_args.max
if parsed_args.numthreads is not None:
server_args['minthreads'] = parsed_args.numthreads
return server.HTTPServer(**server_args)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import sys
import six
from six.moves import filter
from . import server
from .workers import threadpool
from ._compat import ntob, bton
class Server(server.HTTPServer):
"""A subclass of HTTPServer which calls a WSGI application."""
wsgi_version = (1, 0)
"""The version of WSGI to produce."""
def __init__(
self,
bind_addr,
wsgi_app,
numthreads=10,
server_name=None,
max=-1,
request_queue_size=5,
timeout=10,
shutdown_timeout=5,
accepted_queue_size=-1,
# Set response status
req.status = str(status or '500 Server Error')
# Set response headers
for header, value in headers:
req.outheaders.append((header, value))
if (req.ready and not req.sent_headers):
req.sent_headers = True
req.send_headers()
# Set response body
for seg in body:
req.write(seg)
class CPHTTPServer(cheroot.server.HTTPServer):
"""Wrapper for cheroot.server.HTTPServer.
cheroot has been designed to not reference CherryPy in any way,
so that it can be used in other frameworks and applications.
Therefore, we wrap it here, so we can apply some attributes
from config -> cherrypy.server -> HTTPServer.
"""
def __init__(self, server_adapter=cherrypy.server):
"""Initialize CPHTTPServer."""
self.server_adapter = server_adapter
server_name = (self.server_adapter.socket_host or
self.server_adapter.socket_file or
None)