Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from cheroot.test import webtest
import cheroot.wsgi
EPHEMERAL_PORT = 0
NO_INTERFACE = None # Using this or '' will cause an exception
ANY_INTERFACE_IPV4 = '0.0.0.0'
ANY_INTERFACE_IPV6 = '::'
config = {
cheroot.wsgi.Server: {
'bind_addr': (NO_INTERFACE, EPHEMERAL_PORT),
'wsgi_app': None,
},
cheroot.server.HTTPServer: {
'bind_addr': (NO_INTERFACE, EPHEMERAL_PORT),
'gateway': cheroot.server.Gateway,
},
}
def cheroot_server(server_factory):
"""Set up and tear down a Cheroot server instance."""
conf = config[server_factory].copy()
bind_port = conf.pop('bind_addr')[-1]
for interface in ANY_INTERFACE_IPV6, ANY_INTERFACE_IPV4:
try:
actual_bind_addr = (interface, bind_port)
httpserver = server_factory( # create it
bind_addr=actual_bind_addr,
**conf
)
from cheroot.test import webtest
import cheroot.wsgi
EPHEMERAL_PORT = 0
NO_INTERFACE = None # Using this or '' will cause an exception
ANY_INTERFACE_IPV4 = '0.0.0.0'
ANY_INTERFACE_IPV6 = '::'
config = {
cheroot.wsgi.Server: {
'bind_addr': (NO_INTERFACE, EPHEMERAL_PORT),
'wsgi_app': None,
},
cheroot.server.HTTPServer: {
'bind_addr': (NO_INTERFACE, EPHEMERAL_PORT),
'gateway': cheroot.server.Gateway,
},
}
def cheroot_server(server_factory):
"""Set up and tear down a Cheroot server instance."""
conf = config[server_factory].copy()
bind_port = conf.pop('bind_addr')[-1]
for interface in ANY_INTERFACE_IPV6, ANY_INTERFACE_IPV4:
try:
actual_bind_addr = (interface, bind_port)
httpserver = server_factory( # create it
bind_addr=actual_bind_addr,
**conf
)
"""Native adapter for serving CherryPy via its builtin server."""
import logging
import sys
import io
import cheroot.server
import cherrypy
from cherrypy._cperror import format_exc, bare_error
from cherrypy.lib import httputil
class NativeGateway(cheroot.server.Gateway):
"""Native gateway implementation allowing to bypass WSGI."""
recursive = False
def respond(self):
"""Obtain response from CherryPy machinery and then send it."""
req = self.req
try:
# Obtain a Request object from CherryPy
local = req.server.bind_addr
local = httputil.Host(local[0], local[1], '')
remote = req.conn.remote_addr, req.conn.remote_port
remote = httputil.Host(remote[0], remote[1], '')
scheme = req.scheme
sn = cherrypy.tree.script_name(req.uri or '/')
"""Native adapter for serving CherryPy via its builtin server."""
import logging
import sys
import io
import cheroot.server
import cherrypy
from cherrypy._cperror import format_exc, bare_error
from cherrypy.lib import httputil
class NativeGateway(cheroot.server.Gateway):
"""Native gateway implementation allowing to bypass WSGI."""
recursive = False
def respond(self):
"""Obtain response from CherryPy machinery and then send it."""
req = self.req
try:
# Obtain a Request object from CherryPy
local = req.server.bind_addr
local = httputil.Host(local[0], local[1], '')
remote = req.conn.remote_addr, req.conn.remote_port
remote = httputil.Host(remote[0], remote[1], '')
scheme = req.scheme
sn = cherrypy.tree.script_name(req.uri or '/')
max=max,
accepted_queue_size=accepted_queue_size,
accepted_queue_timeout=accepted_queue_timeout,
)
@property
def numthreads(self):
"""Set minimum number of threads."""
return self.requests.min
@numthreads.setter
def numthreads(self, value):
self.requests.min = value
class Gateway(server.Gateway):
"""A base class to interface HTTPServer with WSGI."""
def __init__(self, req):
"""Initialize WSGI Gateway instance with request.
Args:
req (HTTPRequest): current HTTP request
"""
super(Gateway, self).__init__(req)
self.started_response = False
self.env = self.get_environ()
self.remaining_bytes_out = None
@classmethod
def gateway_map(cls):
"""Create a mapping of gateways and their versions.
def resolve(cls, full_path):
"""Read WSGI app/Gateway path string and import application module."""
mod_path, _, app_path = full_path.partition(':')
app = getattr(import_module(mod_path), app_path or 'application')
with contextlib.suppress(TypeError):
if issubclass(app, server.Gateway):
return GatewayYo(app)
return cls(app)
def resolve(cls, full_path):
"""Read WSGI app/Gateway path string and import application module."""
mod_path, _, app_path = full_path.partition(':')
app = getattr(import_module(mod_path), app_path or 'application')
# suppress the `TypeError` exception, just in case `app` is not a class
with suppress(TypeError):
if issubclass(app, server.Gateway):
return GatewayYo(app)
return cls(app)
"""Native adapter for serving CherryPy via its builtin server."""
import logging
import sys
import io
import cheroot.server
import cherrypy
from cherrypy._cperror import format_exc, bare_error
from cherrypy.lib import httputil
class NativeGateway(cheroot.server.Gateway):
"""Native gateway implementation allowing to bypass WSGI."""
recursive = False
def respond(self):
"""Obtain response from CherryPy machinery and then send it."""
req = self.req
try:
# Obtain a Request object from CherryPy
local = req.server.bind_addr
local = httputil.Host(local[0], local[1], '')
remote = req.conn.remote_addr, req.conn.remote_port
remote = httputil.Host(remote[0], remote[1], '')
scheme = req.scheme
sn = cherrypy.tree.script_name(req.uri or '/')