Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#print uwsgi.pippo
#print 4/0
# try:
# print 4/0
#
# print uwsgi.pippo
# except Exception:
# print "bah"
# print "ok"
# yield 4/0
yield '<h1>uWSGI status ('+env['SCRIPT_NAME']+')</h1>'
yield 'masterpid: <b>' + str(uwsgi.masterpid()) + '</b><br>'
yield 'started on: <b>' + time.ctime(uwsgi.started_on) + '</b><br>'
yield 'buffer size: <b>' + str(uwsgi.buffer_size) + '</b><br>'
yield 'total_requests: <b>' + str(uwsgi.total_requests()) + '</b><br>'
yield 'workers: <b>' + str(uwsgi.numproc) + '</b><br>'
yield ''
yield ''
workers = uwsgi.workers()
yield '<h2>workers</h2><table border="1"><tbody><tr><th>worker id</th><th>pid</th><th>in request</th><th>requests</th><th>running time</th><th>address space</th><th>rss</th></tr></tbody></table>'
print(uwsgi.opt)
sig_timeout = uwsgi.opt.get('test_mule_timeout', 10)
sig_to_send = uwsgi.opt.get('test_signal', signal.SIGINT)
def sig_handler(signum, frame):
print('Hello from signal', signum)
time.sleep(int(sig_timeout))
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGHUP, sig_handler)
time.sleep(1)
os.kill(uwsgi.masterpid(), int(sig_to_send))
while True:
uwsgi.farm_get_msg()
import uwsgi
import sys
from threading import Thread
try:
import cPickle as pickle
except:
import pickle
if uwsgi.masterpid() == 0:
raise Exception(
"you have to enable the uWSGI master process to use this module")
spooler_functions = {}
mule_functions = {}
postfork_chain = []
def get_free_signal():
for signum in xrange(0, 256):
if not uwsgi.signal_registered(signum):
return signum
raise Exception("No free uwsgi signal available")
def manage_spool_request(vars):
from functools import partial
import sys
from threading import Thread
try:
import cPickle as pickle
except ImportError:
import pickle
import uwsgi
if uwsgi.masterpid() == 0:
raise Exception(
"you have to enable the uWSGI master process to use this module")
spooler_functions = {}
mule_functions = {}
postfork_chain = []
# Python3 compatibility
def _encode1(val):
if sys.version_info >= (3, 0) and isinstance(val, str):
return val.encode('utf-8')
else:
return val
"""
Contains all uwsgi decorators documented on `uWSGI documentation`_
.. _uWSGI documentation: http://projects.unbit.it/uwsgi/wiki/Decorators
"""
import uwsgi
from threading import Thread
try:
import cPickle as pickle
except:
import pickle
if uwsgi.masterpid() == 0:
raise Exception(
"you have to enable the uWSGI master process to use this module")
spooler_functions = {}
mule_functions = {}
postfork_chain = []
def get_free_signal():
for signum in xrange(0, 256):
if not uwsgi.signal_registered(signum):
return signum
raise Exception("No free uwsgi signal available")
# We should have a good app_name by this point.
# Last conditional, if uwsgi, then wrap the name
# with the uwsgi process type
if basename == "uwsgi":
# We have an app name by this point. Now if running under
# uwsgi, augment the app name
try:
import uwsgi
if app_name == "uwsgi":
app_name = ""
else:
app_name = " [%s]" % app_name
if os.getpid() == uwsgi.masterpid():
uwsgi_type = "uWSGI master%s"
else:
uwsgi_type = "uWSGI worker%s"
app_name = uwsgi_type % app_name
except ImportError:
pass
return app_name
except Exception as e:
logger.debug("get_application_name: ", exc_info=True)
return app_name
def application(env, start_response):
print env
start_response('200 OK', [('Content-Type', 'text/html')])
yield '<h1>uWSGI %s status</h1>' % uwsgi.version
yield 'masterpid: <b>' + str(uwsgi.masterpid()) + '</b><br>'
yield 'started on: <b>' + time.ctime(uwsgi.started_on) + '</b><br>'
yield 'buffer size: <b>' + str(uwsgi.buffer_size) + '</b><br>'
yield 'total_requests: <b>' + str(uwsgi.total_requests()) + '</b><br>'
yield 'log size: <b>' + str(uwsgi.logsize()) + '</b><br>'
yield 'workers: <b>' + str(uwsgi.numproc) + '</b><br>'
yield "cwd: <b>%s</b><br>" % os.getcwd()
try:
yield "mode: <b>%s</b><br>" % uwsgi.mode
except Exception:
def index(request):
workers = uwsgi.workers()
total_load = time.time() - uwsgi.started_on
for w in workers:
w['load'] = (100 * (w['running_time']/1000))/total_load
w['last_spawn_str'] = time.ctime(w['last_spawn'])
jobs = []
if 'spooler' in uwsgi.opt:
spooler_jobs = uwsgi.spooler_jobs()
for j in spooler_jobs:
jobs.append({'file': j, 'env': uwsgi.parsefile(j)})
return render_to_response('uwsgi.html', {'masterpid': uwsgi.masterpid(),
'started_on': time.ctime(uwsgi.started_on),
'buffer_size': uwsgi.buffer_size,
'total_requests': uwsgi.total_requests(),
'numproc': uwsgi.numproc,
'workers': workers,
'jobs': jobs,
}, RequestContext(request, {}))
index = staff_member_required(index)
def should_start_http_server(self):
import uwsgi
return os.getpid() == uwsgi.masterpid()