Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def log_message(self, fmt, *args):
logger = get_logger(__name__, type(self))
logger.debug(fmt, *args)
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see .
import ctypes
import inspect
import platform
from ..logging import get_logger
__all__ = ["Interrupt", "raise_thread_exception"]
logger = get_logger(__name__)
current_platform = platform.python_implementation()
supported_platforms = {"CPython"}
class Interrupt(BaseException):
"""Base class for exceptions used to asynchronously interrupt a
thread's execution. An actor may catch these exceptions in order
to respond gracefully, such as performing any necessary cleanup.
This is *not* a subclass of ``DramatiqError`` to avoid it being
caught unintentionally.
"""
def raise_thread_exception(thread_id, exception):
def __init__(self, parameters, queue_name, prefetch, timeout):
try:
self.logger = get_logger(__name__, type(self))
self.connection = pika.BlockingConnection(parameters=parameters)
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=prefetch)
self.iterator = self.channel.consume(queue_name, inactivity_timeout=timeout / 1000)
# We need to keep track of known delivery tags so that
# when connection errors occur and the consumer is reset,
# we don't attempt to send invalid tags to Rabbit since
# pika doesn't handle this very well.
self.known_tags = set()
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.AMQPChannelError) as e:
raise ConnectionClosed(e) from None
def __init__(self, *, max_retries=20, min_backoff=None, max_backoff=None, retry_when=None):
self.logger = get_logger(__name__, type(self))
self.max_retries = max_retries
self.min_backoff = min_backoff or DEFAULT_MIN_BACKOFF
self.max_backoff = max_backoff or DEFAULT_MAX_BACKOFF
self.retry_when = retry_when
def __init__(self, *, max_age=None):
self.logger = get_logger(__name__, type(self))
self.max_age = max_age
def __init__(self, broker, *, queues=None, worker_timeout=1000, worker_threads=8):
self.logger = get_logger(__name__, type(self))
self.broker = broker
self.consumers = {}
self.consumer_whitelist = queues and set(queues)
# Load a small factor more messages than there are workers to
# avoid waiting on network IO as much as possible. The factor
# must be small so we don't starve other workers out.
self.queue_prefetch = QUEUE_PREFETCH or min(worker_threads * 2, 65535)
# Load a large factor more delay messages than there are
# workers as those messages could have far-future etas.
self.delay_prefetch = min(worker_threads * 1000, 65535)
self.workers = []
self.work_queue = PriorityQueue()
self.worker_timeout = worker_timeout
self.worker_threads = worker_threads
def __init__(self, *, broker, consumers, work_queue, worker_timeout):
super().__init__(daemon=True)
self.logger = get_logger(__name__, "WorkerThread")
self.running = False
self.paused = False
self.paused_event = Event()
self.broker = broker
self.consumers = consumers
self.work_queue = work_queue
self.timeout = worker_timeout / 1000
def __init__(self):
self.logger = get_logger(__name__, type(self))
self.delayed_messages = set()
self.message_start_times = {}
def __init__(self, *, broker, queue_name, prefetch, work_queue, worker_timeout):
super().__init__(daemon=True)
self.logger = get_logger(__name__, "ConsumerThread(%s)" % queue_name)
self.running = False
self.paused = False
self.paused_event = Event()
self.consumer = None
self.broker = broker
self.prefetch = prefetch
self.queue_name = queue_name
self.work_queue = work_queue
self.worker_timeout = worker_timeout
self.delay_queue = PriorityQueue()
def __init__(self, worker):
self.logger = get_logger(__name__, type(self))
self.worker = worker