How to use the kombu.Queue function in kombu

To help you get started, we’ve selected a few kombu examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github jay-johnson / celery-connectors / tests / mixin_pub_sub.py View on Github external
name = ev("APP_NAME", "robopubsub")
log = build_colorized_logger(
    name=name)


broker_url = ev("PUB_BROKER_URL", "pyamqp://rabbitmq:rabbitmq@localhost:5672//")
exchange_name = ev("PUBLISH_EXCHANGE", "ecomm.api")
exchange_type = ev("PUBLISH_EXCHANGE_TYPE", "topic")
routing_key = ev("PUBLISH_ROUTING_KEY", "ecomm.api.west")
queue_name = ev("PUBLISH_QUEUE", "ecomm.api.west")
prefetch_count = int(ev("PREFETCH_COUNT", "1"))
priority_routing = {"high": queue_name,
                    "low": queue_name}
use_exchange = Exchange(exchange_name, type=exchange_type)
use_routing_key = routing_key
use_queue = Queue(queue_name, exchange=use_exchange, routing_key=routing_key)
task_queues = [
    use_queue
]
ssl_options = build_ssl_options()
transport_options = {}


def send_task_msg(conn=None,
                  data={},
                  exchange=None,     # kombu.Exchange object
                  routing_key=None,  # string
                  priority="high",
                  priority_routing={},
                  serializer="json",
                  **kwargs):
github celery / celery / t / unit / contrib / test_migrate.py View on Github external
def test_maybe_queue():
    app = Mock()
    app.amqp.queues = {'foo': 313}
    assert _maybe_queue(app, 'foo') == 313
    assert _maybe_queue(app, Queue('foo')) == Queue('foo')
github celery / kombu / t / unit / test_messaging.py View on Github external
def test_revive__with_prefetch_count(self):
        channel = Mock(name='channel')
        b1 = Queue('qname1', self.exchange, 'rkey')
        Consumer(channel, [b1], prefetch_count=14)
        channel.basic_qos.assert_called_with(0, 14, False)
github openstack / heat / heat_integrationtests / functional / test_notifications.py View on Github external
def setUp(self):
        super(NotificationTest, self).setUp()
        self.exchange = kombu.Exchange('heat', 'topic', durable=False)
        queue = kombu.Queue(exchange=self.exchange,
                            routing_key='notifications.info',
                            exclusive=True)
        self.conn = kombu.Connection(get_url(
            transport.get_transport(cfg.CONF).conf))
        self.ch = self.conn.channel()
        self.queue = queue(self.ch)
        self.queue.declare()
github celery / kombu / t / unit / test_messaging.py View on Github external
def test_accept__content_disallowed(self):
        conn = Connection('memory://')
        q = Queue('foo', exchange=self.exchange)
        p = conn.Producer()
        p.publish(
            {'complex': object()},
            declare=[q], exchange=self.exchange, serializer='pickle',
        )

        callback = Mock(name='callback')
        with conn.Consumer(queues=[q], callbacks=[callback]) as consumer:
            with pytest.raises(consumer.ContentDisallowed):
                conn.drain_events(timeout=1)
        callback.assert_not_called()
github qingduyu / roe / roeops / settings.py View on Github external
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 这是使用了django-celery默认的数据库调度模型,任务执行周期被存在你指定的orm数据库中
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERYD_MAX_TASKS_PER_CHILD = 400 #每个worker 执行了多少个人物就会死掉
CELERYD_CONCURRENCY = 20 #celery worker 并发数,也是命令行-c指定的数目,事实上并不是月多月号
CELERY_TRACK_STARTED = True
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE='Asia/Shanghai'
platforms.C_FORCE_ROOT = True  #允许root启动

#celery导入所有的任务模块
CELERY_IMPORTS = ("tasks.ansible",
                 "tasks.deploy",
                  "MysqlOps.tasks","CMDB.tasks")
CELERY_QUEUES = (
    Queue('default',Exchange('default'),routing_key='default'),  #指定队列
    Queue('ansible',Exchange('ansible'),routing_key='ansible'),  #指定ansible队列
    Queue('database',Exchange('database'),routing_key='database'),  #指定database队列
)
#exchange 交换机,决定了消息路由规则,交换机 有一个路由key
#下面定义路由规则,task.sql ,会执行database 这个队列
# assets,cron,sched 下面的任务走 队列  default,队列,并且按照routerkey 打头
CELERY_ROUTES = {
    'CMDB.tasks.*':{'queue':'default','routing_key':'default'},
    'MysqlOps.tasks.*':{'queue':'database','routing_key':'database'},
    'tasks.ansible.AnsibleScripts':{'queue':'ansible','routing_key':'ansible'},
    'tasks.ansible.AnsiblePlayBook':{'queue':'ansible','routing_key':'ansible'},
}
CELERY_DEFAULT_QUEUE = 'default'
#交换机的作用
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
github ghosert / VimProject / StudyPyramid / celery / celeryconfig.py View on Github external
# tasks.add will go with the queue: tasks_queue for both producer and consumer
CELERY_ROUTES = {
            'tasks.add': {'queue': 'tasks_queue', 'routing_key': 'tasks_routing'},
            'tasks.mul': {'queue': 'tasks_queue', 'routing_key': 'tasks_routing'},
            #'tasks.add_1': {'queue': 'tasks_queue', 'routing_key': 'tasks_routing_1'},
            #'tasks.add_2': {'queue': 'tasks_queue_2', 'routing_key': 'tasks_routing_2'},
            'tasks.pdf': {'queue': 'tasks_pdf_queue', 'routing_key': 'tasks_pdf_routing'},
        }

# define exchanges explicitly, change type here requires reset queue/exchange: 'celery amqp queue.delete tasks_queue' and 'celery amqp exchange.delete tasks_exchange'
tasks_exchange = Exchange('tasks_exchange', type='direct') # fanout/direct/topic

# For tasks.py to listen the queue: tasks_queue
CELERY_QUEUES = (
            Queue('tasks_queue', tasks_exchange, routing_key='tasks_routing'),
            # Queue('tasks_queue', tasks_exchange, routing_key='tasks_routing_1'),
            # Queue('tasks_queue_2', tasks_exchange, routing_key='tasks_routing_2'),
            # routing_key could be 'tasks.#', '*.tasks.*' if exchange type is 'topic'
            Queue('tasks_pdf_queue', tasks_exchange, routing_key='tasks_pdf_routing'),
        )

# acknowledged after the task has been executed, False by default
CELERY_ACKS_LATE = True

# The worker will reserve at most one extra task for every active worker process.
# CELERYD_PREFETCH_MULTIPLIER = 1
#
# Good for many tasks with a long duration, larger this number for many short-running tasks, if there are tasks with the combination of long/short-running tasks, go with two queues and using dedicated workers with different MULTIPLIER values to consume the each queue.

# 'pickle', 'json', 'yaml', 'msgpack' are supported for serialization/unserialization
CELERY_TASK_SERIALIZER = 'pickle'
github FORTH-ICS-INSPIRE / artemis / backend / core / database.py View on Github external
)
            self.mitigation_exchange = Exchange(
                "mitigation", type="direct", durable=False, delivery_mode=1
            )

            # QUEUES
            self.update_queue = Queue(
                "db-bgp-update",
                exchange=self.update_exchange,
                routing_key="update",
                durable=False,
                auto_delete=True,
                max_priority=1,
                consumer_arguments={"x-priority": 1},
            )
            self.withdraw_queue = Queue(
                "db-withdraw-update",
                exchange=self.update_exchange,
                routing_key="withdraw",
                durable=False,
                auto_delete=True,
                max_priority=1,
                consumer_arguments={"x-priority": 1},
            )

            self.hijack_queue = Queue(
                "db-hijack-update-{}".format(uuid()),
                exchange=self.hijack_hashing,
                routing_key="1",
                durable=False,
                auto_delete=True,
                max_priority=1,
github mozilla / treeherder / treeherder / services / pulse / consumers.py View on Github external
def bind_to(self, exchange, routing_key):
        if not self.queue:
            self.queue = Queue(
                name=self.queue_name,
                channel=self.connection.channel(),
                exchange=exchange,
                routing_key=routing_key,
                durable=True,
                auto_delete=False,
            )
            self.consumers.append(dict(queues=self.queue,
                                       callbacks=[self.on_message]))
            # just in case the queue does not already exist on Pulse
            self.queue.declare()
        else:
            self.queue.bind_to(exchange=exchange, routing_key=routing_key)

        # get the binding key for this consumer
        binding = self.get_binding_str(exchange.name, routing_key)
github nameko / nameko / nameko / legacy / nova.py View on Github external
def get_reply_queue(msgid, channel=None):
    exchange = get_reply_exchange(msgid, channel=channel)
    return Queue(
        name=msgid,
        channel=channel,
        exchange=exchange,
        routing_key=msgid,
        durable=False,
        auto_delete=True,
        exclusive=True)