Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def stop(self):
# Mark the thread as stopping
StoppableThread.stop(self)
# Close the socket - this will break self.trans.accept() call with an exception
self.trans.close()
# Wait some amount of time for all the futures to complete
futures_status = wait(
self.futures, timeout=self.WORKER_TIMEOUT, return_when=ALL_COMPLETED
)
# If there are still workers remaining after the timeout then we remove
# references to them. We need to do this because workers are daemons but
# concurrent.futures.thread adds a hook to join all workers with no timeout when
# shutting down. So any hung worker prevents the application from shutting down.
if futures_status.not_done:
self.logger.warning(
"There were still unfinished worker "
# -*- coding: utf-8 -*-
import logging
from concurrent.futures import wait, ThreadPoolExecutor, ALL_COMPLETED
from threading import Event
from brewtils.stoppable_thread import StoppableThread
from thriftpy2.server import TThreadedServer
from thriftpy2.thrift import TProcessor
from thriftpy2.transport import TServerSocket, TSSLServerSocket
import beer_garden.api.thrift
class BartenderThriftServer(TThreadedServer, StoppableThread):
"""Thrift server that uses a ThreadPoolExecutor to process requests"""
# Amount of time (in seconds) after shutdown requested to wait for workers to finish
WORKER_TIMEOUT = 5
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
self.display_name = "Thrift Server"
self.pool = ThreadPoolExecutor(max_workers=25)
self.futures = set()
self.finished = Event()
StoppableThread.__init__(
self, logger=self.logger, name=kwargs.pop("name", "ThriftPyServer")
)
TThreadedServer.__init__(self, *args, **kwargs)
from brewtils.stoppable_thread import StoppableThread
from multiprocessing import Queue
class EventListener(StoppableThread):
"""
Base class for Event Listeners
"""
def __init__(self):
super().__init__()
self.events_queue = Queue()
def receive_next_message(self, event):
"""
Accepts new messages for the Events Listener to process in the order that are received
:param event: The Event to be published
"""
self.events_queue.put(event)
INSTANCES = 2
PLUGIN_ARGS = 3
ENVIRONMENT = 4
LOG_LEVEL = 5
NAME = 6
VERSION = 7
DESCRIPTION = 8
MAX_INSTANCES = 9
ICON_NAME = 10
DISPLAY_NAME = 11
METADATA = 12
NAMESPACE = 13
class PluginManager(StoppableThread):
"""Manages creation and destruction of PluginRunners"""
logger = logging.getLogger(__name__)
_instance = None
runners: List[ProcessRunner] = []
def __init__(
self,
plugin_dir=None,
log_dir=None,
connection_info=None,
username=None,
password=None,
):
self.display_name = "Plugin Manager"
# -*- coding: utf-8 -*-
import logging
from datetime import datetime, timedelta
from typing import Tuple, List
from brewtils.stoppable_thread import StoppableThread
from mongoengine import Q
from beer_garden.db.sql.models import Event, Request
class SqlPruner(StoppableThread):
def __init__(self, tasks=None, run_every=None):
self.logger = logging.getLogger(__name__)
self.display_name = "SQL Pruner"
self._run_every = (run_every or timedelta(minutes=15)).total_seconds()
self._tasks = tasks or []
super(SqlPruner, self).__init__(logger=self.logger, name="Remover")
# -*- coding: utf-8 -*-
import logging
from datetime import datetime, timedelta
from brewtils.models import Instance, Request
from brewtils.stoppable_thread import StoppableThread
import beer_garden.db.api as db
import beer_garden.queue.api as queue
class PluginStatusMonitor(StoppableThread):
"""Monitor plugin heartbeats and update plugin status"""
def __init__(self, heartbeat_interval=10, timeout_seconds=30):
self.logger = logging.getLogger(__name__)
self.display_name = "Plugin Status Monitor"
self.heartbeat_interval = heartbeat_interval
self.timeout = timedelta(seconds=timeout_seconds)
self.status_request = Request(command="_status", command_type="EPHEMERAL")
super(PluginStatusMonitor, self).__init__(
logger=self.logger, name="PluginStatusMonitor"
)
def run(self):
self.logger.info(self.display_name + " is started")
# -*- coding: utf-8 -*-
import datetime
import logging
from http.server import ThreadingHTTPServer
from brewtils.stoppable_thread import StoppableThread
from prometheus_client import Gauge, Counter, Summary
from prometheus_client.exposition import MetricsHandler
from prometheus_client.registry import REGISTRY
from beer_garden.bg_utils.mongo.models import Request
class PrometheusServer(StoppableThread):
"""Wraps a ThreadingHTTPServer to serve Prometheus metrics"""
def __init__(self, host, port):
self.logger = logging.getLogger(__name__)
self.display_name = "Prometheus Server"
self._host = host
self._port = port
# Basically prometheus_client.exposition.start_http_server
metrics_handler = MetricsHandler.factory(REGISTRY)
self.httpd = ThreadingHTTPServer((host, port), metrics_handler)
super(PrometheusServer, self).__init__(
logger=self.logger, name="PrometheusServer"
)
# -*- coding: utf-8 -*-
import logging
from brewtils.models import Instance
from brewtils.stoppable_thread import StoppableThread
import beer_garden.db.api as db
from beer_garden.local_plugins.manager import runners, RunnerManager
class LocalPluginMonitor(StoppableThread):
"""Object to constantly monitor that plugins are alive and working.
When one is down and out, it will attempt to restart that plugin.
"""
def __init__(self, plugin_manager):
self.logger = logging.getLogger(__name__)
self.display_name = "Local Plugin Monitor"
self.plugin_manager = plugin_manager
self.registry = LocalPluginRegistry.instance()
super(LocalPluginMonitor, self).__init__(
logger=self.logger, name="LocalPluginMonitor"
)
def run(self):
# COMPONENTS #
events_queue = None
def establish_events_queue(queue: Queue = None):
global events_queue
if queue is None:
context = multiprocessing.get_context("spawn")
queue = context.Queue()
events_queue = queue
class EventProcessor(StoppableThread):
"""
Base class for Event Listeners
"""
def __init__(self):
super().__init__()
self.events_queue = Queue()
def receive_next_message(self, event):
"""
Accepts new messages for the Events Listener to process in the order that are received
:param event: The Event to be published
"""
self.events_queue.put(event)
# -*- coding: utf-8 -*-
import logging
from datetime import datetime, timedelta
from typing import Tuple, List
from brewtils.stoppable_thread import StoppableThread
from mongoengine import Q
#from beer_garden.db.mongo.models import Event, Request
from beer_garden.db.mongo.new_models import Event, Request
class MongoPruner(StoppableThread):
def __init__(self, tasks=None, run_every=None):
self.logger = logging.getLogger(__name__)
self.display_name = "Mongo Pruner"
self._run_every = (run_every or timedelta(minutes=15)).total_seconds()
self._tasks = tasks or []
super(MongoPruner, self).__init__(logger=self.logger, name="Remover")
def add_task(
self, collection=None, field=None, delete_after=None, additional_query=None
):
self._tasks.append(
{
"collection": collection,
"field": field,
"delete_after": delete_after,