Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import uwsgi
from threading import Thread
if uwsgi.masterpid() == 0:
raise Exception("You must enable the uWSGI master process to use this module")
if uwsgi.opt.get('lazy'):
raise Exception("uWSGI lazy mode is not supported by this module")
spooler_functions = {}
postfork_chain = []
def get_free_signal():
for signum in xrange(0, 256):
if not uwsgi.signal_registered(signum):
return signum
raise Exception("No free uwsgi signal available")
def manage_spool_request(vars):
ret = spooler_functions[vars['ud_spool_func']](vars)
#conf_file = "/etc/image-scanner/image-scanner.conf"
#config = ConfigParser.RawConfigParser()
#try:
# # Check if we have a conf file
# config.read(conf_file)
# # If we find a conf-file, override it if passed via command line
# # else use the conf-file
# print uwsgi.opt.get('socket')
# port = config.get('main', 'port')
# host = config.get('main', 'hostip')
# dockerhost = config.get('main', 'dockerhost')
#except ConfigParser.NoSectionError:
# # No conf file found
# raise
host, port = uwsgi.opt.get('socket').split(':')
dockerhost = uwsgi.opt.get('dockerhost')
return port, host, dockerhost
def __initialize_locks(self):
num = int(uwsgi.opt.get('locks', 0)) + 1
farms = self.stack._configured_farms.keys()
need = len(farms)
if num < need:
raise RuntimeError('Need %i uWSGI locks but only %i exist(s): Set `locks = %i` in uWSGI configuration' % (need, num, need - 1))
self._locks.extend(['RECV_MSG_FARM_' + x for x in farms])
# this would be nice, but in my 2.0.15 uWSGI, the uwsgi module has no set_option function, and I don't know if it'd work even if the function existed as documented
def _uwsgi_configured_mules():
mules = uwsgi.opt.get('mule', [])
return [mules] if isinstance(mules, string_types) or mules is True else mules
"""
The uwsgi and uwsgidecorators packages are added automatically to the Python environment
when running under uWSGI. Here we attempt to detect the presence of these packages and
then use the appropriate hooks.
"""
from __future__ import absolute_import
from ..log import logger
from ..singletons import agent
try:
import uwsgi
logger.debug("uWSGI options: %s", uwsgi.opt)
opt_master = uwsgi.opt.get('master', False)
opt_lazy_apps = uwsgi.opt.get('lazy-apps', False)
if uwsgi.opt.get('enable-threads', False) is False and uwsgi.opt.get('gevent', False) is False:
logger.warning("Required: Neither uWSGI threads or gevent is enabled. " +
"Please enable by using the uWSGI --enable-threads or --gevent option.")
if opt_master and opt_lazy_apps is False:
# --master is supplied in uWSGI options (otherwise uwsgidecorators package won't be available)
# When --lazy-apps is True, this postfork hook isn't needed
import uwsgidecorators
@uwsgidecorators.postfork
def uwsgi_handle_fork():
""" This is our uWSGI hook to detect and act when worker processes are forked off. """
logger.debug("Handling uWSGI fork...")
agent.handle_fork()