Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
watchers = []
for watcher in cfg.get('watchers', []):
watchers.append(Watcher.load_from_config(watcher))
sockets = []
for socket in cfg.get('sockets', []):
sockets.append(CircusSocket.load_from_config(socket))
httpd = cfg.get('httpd', False)
if httpd:
# controlling that we have what it takes to run the web UI
# if something is missing this will tell the user
try:
import circusweb # NOQA
except ImportError:
logger.error('You need to install circus-web')
sys.exit(1)
# creating arbiter
arbiter = cls(watchers, cfg['endpoint'], cfg['pubsub_endpoint'],
check_delay=cfg.get('check_delay', 1.),
prereload_fn=cfg.get('prereload_fn'),
statsd=cfg.get('statsd', False),
stats_endpoint=cfg.get('stats_endpoint'),
multicast_endpoint=cfg.get('multicast_endpoint'),
plugins=cfg.get('plugins'), sockets=sockets,
warmup_delay=cfg.get('warmup_delay', 0),
httpd=httpd,
httpd_host=cfg.get('httpd_host', 'localhost'),
httpd_port=cfg.get('httpd_port', 8080),
debug=cfg.get('debug', False),
ssh_server=cfg.get('ssh_server', None),
watchers = []
for watcher in cfg.get('watchers', []):
watchers.append(Watcher.load_from_config(watcher))
sockets = []
for socket_ in cfg.get('sockets', []):
sockets.append(CircusSocket.load_from_config(socket_))
httpd = cfg.get('httpd', False)
if httpd:
# controlling that we have what it takes to run the web UI
# if something is missing this will tell the user
try:
import circusweb # NOQA
except ImportError:
logger.error('You need to install circus-web')
sys.exit(1)
# creating arbiter
arbiter = cls(watchers, cfg['endpoint'], cfg['pubsub_endpoint'],
check_delay=cfg.get('check_delay', 1.),
prereload_fn=cfg.get('prereload_fn'),
statsd=cfg.get('statsd', False),
stats_endpoint=cfg.get('stats_endpoint'),
papa_endpoint=cfg.get('papa_endpoint'),
multicast_endpoint=cfg.get('multicast_endpoint'),
plugins=cfg.get('plugins'), sockets=sockets,
warmup_delay=cfg.get('warmup_delay', 0),
httpd=httpd,
loop=loop,
httpd_host=cfg.get('httpd_host', 'localhost'),
httpd_port=cfg.get('httpd_port', 8080),
def check_future_exception_and_log(future):
if isinstance(future, concurrent.Future):
exception = future.exception()
if exception is not None:
logger.error("exception %s caught" % exception)
if hasattr(future, "exc_info"):
exc_info = future.exc_info()
traceback.print_tb(exc_info[2])
return exception
def get_worker_states(name, wid, base_port, minimum_age=0.0):
stats = get_uwsgi_stats(name, wid, base_port)
if 'workers' not in stats:
logger.error("Error: No workers found for WID %d of %d", wid, name)
return ['unknown']
workers = stats['workers']
return [
worker["status"] if 'status' in worker and worker['last_spawn'] < time() - minimum_age else 'unknown'
for worker in workers
]
def children_started(watcher, arbiter, hook_name, pid, **kwargs):
name = watcher.name
wid = watcher.processes[pid].wid
base_port = int(watcher._options.get('stats_base_port', 8090))
logger.info('%s waiting for workers', name)
try:
wait_for_workers(name, wid, base_port, 'running', timeout_seconds=15,
minimum_age=5)
return True
except TimeoutError:
logger.error('%s children are flapping on %d', name, pid)
return False
except Exception:
logger.error('%s not publishing stats on %d', name, pid)
return False
def _dispatch_callback(self, msg, cid, mid, cast, cmd_name, resp=None):
if resp is None:
resp = ok()
if not isinstance(resp, (dict, list)):
msg = "msg %r tried to send a non-dict: %s" % (msg, str(resp))
logger.error("msg %r tried to send a non-dict: %s", msg, str(resp))
return self.send_error(mid, cid, msg, "server error", cast=cast,
errno=errors.BAD_MSG_DATA_ERROR)
if isinstance(resp, list):
resp = {"results": resp}
self.send_ok(mid, cid, msg, resp, cast=cast)
if cmd_name.lower() == "quit":
if cid is not None:
self.stream.flush()
def get_uwsgi_stats(name, wid, base_port):
sock = socket.create_connection(('127.0.0.1', base_port + wid), timeout=1)
received = sock.recv(100000)
data = bytes()
while received:
data += received
received = sock.recv(100000)
if not data:
logger.error(
"Error: No stats seem available for WID %d of %s", wid, name)
return
# recent versions of uWSGI had some garbage in the JSON so strip it out
data = data.decode('latin', 'replace')
data = NON_JSON_CHARACTERS.sub('', data)
return loads(data)
def handle_recv(self, data):
"""Handle received message from circusd
We need to handle two messages:
- spawn: add a new monitored child pid
- reap: remove a killed child pid from monitoring
"""
watcher_name, action, msg = self.split_data(data)
logger.debug("received data from circusd: watcher.%s.%s, %s",
watcher_name, action, msg)
# check if monitored watchers:
if self._match_watcher_name(watcher_name):
try:
message = self.load_message(msg)
except ValueError:
logger.error("Error while decoding json for message: %s",
msg)
else:
if "process_pid" not in message:
logger.warning('no process_pid in message')
return
pid = str(message.get("process_pid"))
if action == "spawn":
self.pid_status[pid] = dict(watcher=watcher_name,
last_activity=time.time())
logger.info("added new monitored pid for %s:%s",
watcher_name,
pid)
# very questionable fix for Py3 here!
# had to add check for pid in self.pid_status
elif action == "reap" and pid in self.pid_status:
old_pid = self.pid_status.pop(pid)