Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def nworkers_changed_3(server, new_value, old_value):
return 3
c.set("nworkers_changed", nworkers_changed_3)
assert c.nworkers_changed(1, 2, 3) == 3
def test_statsd_changes_logger():
c = config.Config()
assert c.logger_class == glogging.Logger
c.set('statsd_host', 'localhost:12345')
assert c.logger_class == statsd.Statsd
class MyLogger(glogging.Logger):
# dummy custom logger class for testing
pass
def test_always_use_configured_logger():
c = config.Config()
c.set('logger_class', __name__ + '.MyLogger')
assert c.logger_class == MyLogger
c.set('statsd_host', 'localhost:12345')
# still uses custom logger over statsd
assert c.logger_class == MyLogger
def test_load_enviroment_variables_config(monkeypatch):
monkeypatch.setenv("GUNICORN_CMD_ARGS", "--workers=4")
with AltArgs():
def test_get_username_from_basic_auth_header(auth):
request = SimpleNamespace(headers=())
response = SimpleNamespace(
status='200', response_length=1024, sent=1024,
headers=(('Content-Type', 'text/plain'),),
)
environ = {
'REQUEST_METHOD': 'GET', 'RAW_URI': '/my/path?foo=bar',
'PATH_INFO': '/my/path', 'QUERY_STRING': 'foo=bar',
'SERVER_PROTOCOL': 'HTTP/1.1',
'HTTP_AUTHORIZATION': auth,
}
logger = Logger(Config())
atoms = logger.atoms(response, request, environ, datetime.timedelta(seconds=1))
assert atoms['u'] == 'brk0v'
import logging
import socket
from re import sub
from gunicorn.glogging import Logger
# Instrumentation constants
METRIC_VAR = "metric"
VALUE_VAR = "value"
MTYPE_VAR = "mtype"
GAUGE_TYPE = "gauge"
COUNTER_TYPE = "counter"
HISTOGRAM_TYPE = "histogram"
class Statsd(Logger):
"""statsD-based instrumentation, that passes as a logger
"""
def __init__(self, cfg):
"""host, port: statsD server
"""
Logger.__init__(self, cfg)
self.prefix = sub(r"^(.+[^.]+)\.*$", "\\g<1>.", cfg.statsd_prefix)
try:
host, port = cfg.statsd_host
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.connect((host, int(port)))
except Exception:
self.sock = None
self.dogstatsd_tags = cfg.dogstatsd_tags
import socket
import logging
from re import sub
from gunicorn.glogging import Logger
from gunicorn import six
# Instrumentation constants
METRIC_VAR = "metric"
VALUE_VAR = "value"
MTYPE_VAR = "mtype"
GAUGE_TYPE = "gauge"
COUNTER_TYPE = "counter"
HISTOGRAM_TYPE = "histogram"
class Statsd(Logger):
"""statsD-based instrumentation, that passes as a logger
"""
def __init__(self, cfg):
"""host, port: statsD server
"""
Logger.__init__(self, cfg)
self.prefix = sub(r"^(.+[^.]+)\.*$", "\\g<1>.", cfg.statsd_prefix)
try:
host, port = cfg.statsd_host
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.connect((host, int(port)))
except Exception:
self.sock = None
# Log errors and warnings
def critical(self, msg, *args, **kwargs):
def access(self, resp, req, environ, request_time):
# health check endpoints are only logged in debug mode
if (
not os.environ.get('DEIS_DEBUG', False) and
req.path in ['/readiness', '/healthz']
):
return
Logger.access(self, resp, req, environ, request_time)
msg = style.HTTP_INFO(msg)
elif code == '304':
msg = style.HTTP_NOT_MODIFIED(msg)
elif code[0] == '3':
msg = style.HTTP_REDIRECT(msg)
elif code == '404':
msg = style.HTTP_NOT_FOUND(msg)
elif code[0] == '4':
msg = style.HTTP_BAD_REQUEST(msg)
else:
# Any 5XX, or any other response
msg = style.HTTP_SERVER_ERROR(msg)
return msg
class GunicornLogger(gunicorn.glogging.Logger):
"""Custom logger class to add styling to access logs.
Note that this is not a `logging.Logger` instance.
"""
datefmt = r'[%d/%b/%Y %H:%M:%S]' # Same date format as runserver.
def __init__(self, cfg):
super(GunicornLogger, self).__init__(cfg)
if os.environ.get('DJANGO_COLORS') == 'nocolor':
self.stylize = lambda msg, resp: msg
else:
self.stylize = functools.partial(colorize, color_style())
def now(self):
"""Override to return date in runserver's format.
"""
typ = extra.get(MTYPE_VAR, None)
if metric and value and typ:
if typ == GAUGE_TYPE:
self.gauge(metric, value)
elif typ == COUNTER_TYPE:
self.increment(metric, value)
elif typ == HISTOGRAM_TYPE:
self.histogram(metric, value)
else:
pass
# Log to parent logger only if there is something to say
if msg:
Logger.log(self, lvl, msg, *args, **kwargs)
except Exception:
Logger.warning(self, "Failed to log to statsd", exc_info=True)
def exception(self, msg, *args, **kwargs):
Logger.exception(self, msg, *args, **kwargs)
self.increment("gunicorn.log.exception", 1)
def __init__(self, *args, **kwargs):
Worker.__init__(self, *args, **kwargs)
Logger.access = _access
Logger.error = _error
if self.cfg.accesslog is self.cfg.logconfig is None:
server.set_access_logger(None)
else:
server.set_access_logger(self.log)
server.set_error_logger(self.log)
metric = extra.get(METRIC_VAR, None)
value = extra.get(VALUE_VAR, None)
typ = extra.get(MTYPE_VAR, None)
if metric and value and typ:
if typ == GAUGE_TYPE:
self.gauge(metric, value)
elif typ == COUNTER_TYPE:
self.increment(metric, value)
elif typ == HISTOGRAM_TYPE:
self.histogram(metric, value)
else:
pass
# Log to parent logger only if there is something to say
if msg:
Logger.log(self, lvl, msg, *args, **kwargs)
except Exception:
Logger.warning(self, "Failed to log to statsd", exc_info=True)