How to use the brewtils.stoppable_thread.StoppableThread function in brewtils

To help you get started, we’ve selected a few brewtils examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github beer-garden / beer-garden / src / app / beer_garden / api / thrift / server.py View on Github external
def stop(self):
        # Mark the thread as stopping
        StoppableThread.stop(self)

        # Close the socket - this will break self.trans.accept() call with an exception
        self.trans.close()

        # Wait some amount of time for all the futures to complete
        futures_status = wait(
            self.futures, timeout=self.WORKER_TIMEOUT, return_when=ALL_COMPLETED
        )

        # If there are still workers remaining after the timeout then we remove
        # references to them. We need to do this because workers are daemons but
        # concurrent.futures.thread adds a hook to join all workers with no timeout when
        # shutting down. So any hung worker prevents the application from shutting down.
        if futures_status.not_done:
            self.logger.warning(
                "There were still unfinished worker "
github beer-garden / beer-garden / src / app / beer_garden / api / thrift / server.py View on Github external
# -*- coding: utf-8 -*-
import logging
from concurrent.futures import wait, ThreadPoolExecutor, ALL_COMPLETED
from threading import Event

from brewtils.stoppable_thread import StoppableThread
from thriftpy2.server import TThreadedServer
from thriftpy2.thrift import TProcessor
from thriftpy2.transport import TServerSocket, TSSLServerSocket

import beer_garden.api.thrift


class BartenderThriftServer(TThreadedServer, StoppableThread):
    """Thrift server that uses a ThreadPoolExecutor to process requests"""

    # Amount of time (in seconds) after shutdown requested to wait for workers to finish
    WORKER_TIMEOUT = 5

    def __init__(self, *args, **kwargs):
        self.logger = logging.getLogger(__name__)
        self.display_name = "Thrift Server"
        self.pool = ThreadPoolExecutor(max_workers=25)
        self.futures = set()
        self.finished = Event()

        StoppableThread.__init__(
            self, logger=self.logger, name=kwargs.pop("name", "ThriftPyServer")
        )
        TThreadedServer.__init__(self, *args, **kwargs)
github beer-garden / beer-garden / src / app / beer_garden / bg_events / event_listener.py View on Github external
from brewtils.stoppable_thread import StoppableThread
from multiprocessing import Queue


class EventListener(StoppableThread):
    """
    Base class for Event Listeners
    """

    def __init__(self):
        super().__init__()
        self.events_queue = Queue()

    def receive_next_message(self, event):
        """
        Accepts new messages for the Events Listener to process in the order that are received

        :param event: The Event to be published
        """
        self.events_queue.put(event)
github beer-garden / beer-garden / src / app / beer_garden / local_plugins / manager.py View on Github external
INSTANCES = 2
    PLUGIN_ARGS = 3
    ENVIRONMENT = 4
    LOG_LEVEL = 5

    NAME = 6
    VERSION = 7
    DESCRIPTION = 8
    MAX_INSTANCES = 9
    ICON_NAME = 10
    DISPLAY_NAME = 11
    METADATA = 12
    NAMESPACE = 13


class PluginManager(StoppableThread):
    """Manages creation and destruction of PluginRunners"""

    logger = logging.getLogger(__name__)
    _instance = None

    runners: List[ProcessRunner] = []

    def __init__(
        self,
        plugin_dir=None,
        log_dir=None,
        connection_info=None,
        username=None,
        password=None,
    ):
        self.display_name = "Plugin Manager"
github beer-garden / beer-garden / src / app / beer_garden / db / sql / pruner.py View on Github external
# -*- coding: utf-8 -*-
import logging
from datetime import datetime, timedelta
from typing import Tuple, List

from brewtils.stoppable_thread import StoppableThread
from mongoengine import Q

from beer_garden.db.sql.models import Event, Request


class SqlPruner(StoppableThread):
    def __init__(self, tasks=None, run_every=None):
        self.logger = logging.getLogger(__name__)
        self.display_name = "SQL Pruner"
        self._run_every = (run_every or timedelta(minutes=15)).total_seconds()
        self._tasks = tasks or []

        super(SqlPruner, self).__init__(logger=self.logger, name="Remover")
github beer-garden / beer-garden / src / app / beer_garden / monitor.py View on Github external
# -*- coding: utf-8 -*-
import logging
from datetime import datetime, timedelta

from brewtils.models import Instance, Request
from brewtils.stoppable_thread import StoppableThread

import beer_garden.db.api as db
import beer_garden.queue.api as queue


class PluginStatusMonitor(StoppableThread):
    """Monitor plugin heartbeats and update plugin status"""

    def __init__(self, heartbeat_interval=10, timeout_seconds=30):
        self.logger = logging.getLogger(__name__)
        self.display_name = "Plugin Status Monitor"
        self.heartbeat_interval = heartbeat_interval
        self.timeout = timedelta(seconds=timeout_seconds)
        self.status_request = Request(command="_status", command_type="EPHEMERAL")

        super(PluginStatusMonitor, self).__init__(
            logger=self.logger, name="PluginStatusMonitor"
        )

    def run(self):
        self.logger.info(self.display_name + " is started")
github beer-garden / beer-garden / src / app / beer_garden / metrics.py View on Github external
# -*- coding: utf-8 -*-
import datetime
import logging
from http.server import ThreadingHTTPServer

from brewtils.stoppable_thread import StoppableThread
from prometheus_client import Gauge, Counter, Summary
from prometheus_client.exposition import MetricsHandler
from prometheus_client.registry import REGISTRY

from beer_garden.bg_utils.mongo.models import Request


class PrometheusServer(StoppableThread):
    """Wraps a ThreadingHTTPServer to serve Prometheus metrics"""

    def __init__(self, host, port):
        self.logger = logging.getLogger(__name__)
        self.display_name = "Prometheus Server"

        self._host = host
        self._port = port

        # Basically prometheus_client.exposition.start_http_server
        metrics_handler = MetricsHandler.factory(REGISTRY)
        self.httpd = ThreadingHTTPServer((host, port), metrics_handler)

        super(PrometheusServer, self).__init__(
            logger=self.logger, name="PrometheusServer"
        )
github beer-garden / beer-garden / src / app / beer_garden / local_plugins / monitor.py View on Github external
# -*- coding: utf-8 -*-
import logging

from brewtils.models import Instance
from brewtils.stoppable_thread import StoppableThread

import beer_garden.db.api as db
from beer_garden.local_plugins.manager import runners, RunnerManager


class LocalPluginMonitor(StoppableThread):
    """Object to constantly monitor that plugins are alive and working.

    When one is down and out, it will attempt to restart that plugin.
    """

    def __init__(self, plugin_manager):
        self.logger = logging.getLogger(__name__)
        self.display_name = "Local Plugin Monitor"
        self.plugin_manager = plugin_manager
        self.registry = LocalPluginRegistry.instance()

        super(LocalPluginMonitor, self).__init__(
            logger=self.logger, name="LocalPluginMonitor"
        )

    def run(self):
github beer-garden / beer-garden / src / app / beer_garden / events / events_manager.py View on Github external
# COMPONENTS #
events_queue = None


def establish_events_queue(queue: Queue = None):
    global events_queue

    if queue is None:
        context = multiprocessing.get_context("spawn")
        queue = context.Queue()

    events_queue = queue


class EventProcessor(StoppableThread):
    """
    Base class for Event Listeners
    """

    def __init__(self):
        super().__init__()
        self.events_queue = Queue()

    def receive_next_message(self, event):
        """
        Accepts new messages for the Events Listener to process in the order that are received

        :param event: The Event to be published
        """

        self.events_queue.put(event)
github beer-garden / beer-garden / src / app / beer_garden / db / mongo / pruner.py View on Github external
# -*- coding: utf-8 -*-
import logging
from datetime import datetime, timedelta
from typing import Tuple, List

from brewtils.stoppable_thread import StoppableThread
from mongoengine import Q

#from beer_garden.db.mongo.models import Event, Request
from beer_garden.db.mongo.new_models import Event, Request


class MongoPruner(StoppableThread):
    def __init__(self, tasks=None, run_every=None):
        self.logger = logging.getLogger(__name__)
        self.display_name = "Mongo Pruner"
        self._run_every = (run_every or timedelta(minutes=15)).total_seconds()
        self._tasks = tasks or []

        super(MongoPruner, self).__init__(logger=self.logger, name="Remover")

    def add_task(
        self, collection=None, field=None, delete_after=None, additional_query=None
    ):
        self._tasks.append(
            {
                "collection": collection,
                "field": field,
                "delete_after": delete_after,