Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import time
import pytest
from six.moves import http_client
import cheroot.server
from cheroot.test import webtest
import cheroot.wsgi
EPHEMERAL_PORT = 0
NO_INTERFACE = None # Using this or '' will cause an exception
ANY_INTERFACE_IPV4 = '0.0.0.0'
ANY_INTERFACE_IPV6 = '::'
config = {
cheroot.wsgi.Server: {
'bind_addr': (NO_INTERFACE, EPHEMERAL_PORT),
'wsgi_app': None,
},
cheroot.server.HTTPServer: {
'bind_addr': (NO_INTERFACE, EPHEMERAL_PORT),
'gateway': cheroot.server.Gateway,
},
}
def cheroot_server(server_factory):
"""Set up and tear down a Cheroot server instance."""
conf = config[server_factory].copy()
bind_port = conf.pop('bind_addr')[-1]
for interface in ANY_INTERFACE_IPV6, ANY_INTERFACE_IPV4:
def wsgi_server():
"""Set up and tear down a Cheroot WSGI server instance."""
for srv in cheroot_server(cheroot.wsgi.Server):
yield srv
def __init__(self, server, conn):
"""Initialize HTTP request container instance.
Args:
server (cheroot.server.HTTPServer):
web server object receiving this request
conn (cheroot.server.HTTPConnection):
HTTP connection object for this request
"""
super(CPWSGIHTTPRequest, self).__init__(
server, conn, proxy_mode=True
)
class CPWSGIServer(cheroot.wsgi.Server):
"""Wrapper for cheroot.wsgi.Server.
cheroot has been designed to not reference CherryPy in any way,
so that it can be used in other frameworks and applications. Therefore,
we wrap it here, so we can set our own mount points from cherrypy.tree
and apply some attributes from config -> cherrypy.server -> wsgi.Server.
"""
fmt = 'CherryPy/{cherrypy.__version__} {cheroot.wsgi.Server.version}'
version = fmt.format(**globals())
def __init__(self, server_adapter=cherrypy.server):
"""Initialize CPWSGIServer instance.
Args:
server_adapter (cherrypy._cpserver.Server): ...
def server(self, parsed_args):
"""Server."""
return wsgi.Server(**self.server_args(parsed_args))
},
"/": {
"root": WEBDAV_DIR,
"readonly": True,
}
},
"verbose": 1,
}
app = WsgiDAVApp(config)
server_args = {
"bind_addr": (config["host"], config["port"]),
"wsgi_app": app,
}
server = wsgi.Server(**server_args)
class MyHandler(FileSystemEventHandler):
def on_created(self, event):
os.rename(
os.path.join(event.src_path),
os.path.join(BLACKHOLE_DIR, os.path.basename(event.src_path)),
)
def watch_blackhole_folder():
observer = Observer()
observer.schedule(MyHandler(), path=WEBDAV_BLACKHOLE, recursive=False)
observer.start()
try:
def WSGIServer(server_address, wsgi_app):
"""Creates CherryPy WSGI server listening at `server_address` to serve `wsgi_app`.
This function can be overwritten to customize the webserver or use a different webserver.
"""
from cheroot import wsgi
server = wsgi.Server(server_address, wsgi_app, server_name="localhost")
server.nodelay = not sys.platform.startswith(
"java"
) # TCP_NODELAY isn't supported on the JVM
return server
def run(self, handler): # pragma: no cover
from cheroot import wsgi
from cheroot.ssl import builtin
self.options['bind_addr'] = (self.host, self.port)
self.options['wsgi_app'] = handler
certfile = self.options.pop('certfile', None)
keyfile = self.options.pop('keyfile', None)
chainfile = self.options.pop('chainfile', None)
self.server = wsgi.Server(**self.options)
if certfile and keyfile:
self.server.ssl_adapter = builtin.BuiltinSSLAdapter(
certfile, keyfile, chainfile)
self.server.start()