Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _makeOne(self, **kw):
from waitress.adjustments import Adjustments
return Adjustments(**kw)
def test_backward_compatibility(self):
from waitress.server import WSGIServer, TcpWSGIServer
from waitress.adjustments import Adjustments
self.assertTrue(WSGIServer is TcpWSGIServer)
self.inst = WSGIServer(None, _start=False, port=1234)
# Ensure the adjustment was actually applied.
self.assertNotEqual(Adjustments.port, 1234)
self.assertEqual(self.inst.adj.port, 1234)
def _makeOne(self, **kw):
from waitress.adjustments import Adjustments
return Adjustments(**kw)
_start=True, # test shim
_sock=None, # test shim
_dispatcher=None, # test shim
**kw # adjustments
):
"""
if __name__ == '__main__':
server = create_server(app)
server.run()
"""
if application is None:
raise ValueError(
'The "app" passed to ``create_server`` was ``None``. You forgot '
"to return a WSGI app within your application."
)
adj = Adjustments(**kw)
if map is None: # pragma: nocover
map = {}
dispatcher = _dispatcher
if dispatcher is None:
dispatcher = ThreadedTaskDispatcher()
dispatcher.set_thread_count(adj.threads)
if adj.unix_socket and hasattr(socket, "AF_UNIX"):
sockinfo = (socket.AF_UNIX, socket.SOCK_STREAM, None, None)
return UnixWSGIServer(
application,
map,
_start,
_sock,
def __init__(
self,
application,
map=None,
_start=True, # test shim
_sock=None, # test shim
dispatcher=None, # dispatcher
adj=None, # adjustments
sockinfo=None, # opaque object
bind_socket=True,
**kw
):
if adj is None:
adj = Adjustments(**kw)
if adj.trusted_proxy or adj.clear_untrusted_proxy_headers:
# wrap the application to deal with proxy headers
# we wrap it here because webtest subclasses the TcpWSGIServer
# directly and thus doesn't run any code that's in create_server
application = proxy_headers_middleware(
application,
trusted_proxy=adj.trusted_proxy,
trusted_proxy_count=adj.trusted_proxy_count,
trusted_proxy_headers=adj.trusted_proxy_headers,
clear_untrusted=adj.clear_untrusted_proxy_headers,
log_untrusted=adj.log_untrusted_proxy_headers,
logger=self.logger,
)
if map is None: