Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
name = ev("APP_NAME", "robopubsub")
log = build_colorized_logger(
name=name)
broker_url = ev("PUB_BROKER_URL", "pyamqp://rabbitmq:rabbitmq@localhost:5672//")
exchange_name = ev("PUBLISH_EXCHANGE", "ecomm.api")
exchange_type = ev("PUBLISH_EXCHANGE_TYPE", "topic")
routing_key = ev("PUBLISH_ROUTING_KEY", "ecomm.api.west")
queue_name = ev("PUBLISH_QUEUE", "ecomm.api.west")
prefetch_count = int(ev("PREFETCH_COUNT", "1"))
priority_routing = {"high": queue_name,
"low": queue_name}
use_exchange = Exchange(exchange_name, type=exchange_type)
use_routing_key = routing_key
use_queue = Queue(queue_name, exchange=use_exchange, routing_key=routing_key)
task_queues = [
use_queue
]
ssl_options = build_ssl_options()
transport_options = {}
def send_task_msg(conn=None,
data={},
exchange=None, # kombu.Exchange object
routing_key=None, # string
priority="high",
priority_routing={},
serializer="json",
**kwargs):
def test_maybe_queue():
app = Mock()
app.amqp.queues = {'foo': 313}
assert _maybe_queue(app, 'foo') == 313
assert _maybe_queue(app, Queue('foo')) == Queue('foo')
def test_revive__with_prefetch_count(self):
channel = Mock(name='channel')
b1 = Queue('qname1', self.exchange, 'rkey')
Consumer(channel, [b1], prefetch_count=14)
channel.basic_qos.assert_called_with(0, 14, False)
def setUp(self):
super(NotificationTest, self).setUp()
self.exchange = kombu.Exchange('heat', 'topic', durable=False)
queue = kombu.Queue(exchange=self.exchange,
routing_key='notifications.info',
exclusive=True)
self.conn = kombu.Connection(get_url(
transport.get_transport(cfg.CONF).conf))
self.ch = self.conn.channel()
self.queue = queue(self.ch)
self.queue.declare()
def test_accept__content_disallowed(self):
conn = Connection('memory://')
q = Queue('foo', exchange=self.exchange)
p = conn.Producer()
p.publish(
{'complex': object()},
declare=[q], exchange=self.exchange, serializer='pickle',
)
callback = Mock(name='callback')
with conn.Consumer(queues=[q], callbacks=[callback]) as consumer:
with pytest.raises(consumer.ContentDisallowed):
conn.drain_events(timeout=1)
callback.assert_not_called()
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 这是使用了django-celery默认的数据库调度模型,任务执行周期被存在你指定的orm数据库中
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERYD_MAX_TASKS_PER_CHILD = 400 #每个worker 执行了多少个人物就会死掉
CELERYD_CONCURRENCY = 20 #celery worker 并发数,也是命令行-c指定的数目,事实上并不是月多月号
CELERY_TRACK_STARTED = True
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE='Asia/Shanghai'
platforms.C_FORCE_ROOT = True #允许root启动
#celery导入所有的任务模块
CELERY_IMPORTS = ("tasks.ansible",
"tasks.deploy",
"MysqlOps.tasks","CMDB.tasks")
CELERY_QUEUES = (
Queue('default',Exchange('default'),routing_key='default'), #指定队列
Queue('ansible',Exchange('ansible'),routing_key='ansible'), #指定ansible队列
Queue('database',Exchange('database'),routing_key='database'), #指定database队列
)
#exchange 交换机,决定了消息路由规则,交换机 有一个路由key
#下面定义路由规则,task.sql ,会执行database 这个队列
# assets,cron,sched 下面的任务走 队列 default,队列,并且按照routerkey 打头
CELERY_ROUTES = {
'CMDB.tasks.*':{'queue':'default','routing_key':'default'},
'MysqlOps.tasks.*':{'queue':'database','routing_key':'database'},
'tasks.ansible.AnsibleScripts':{'queue':'ansible','routing_key':'ansible'},
'tasks.ansible.AnsiblePlayBook':{'queue':'ansible','routing_key':'ansible'},
}
CELERY_DEFAULT_QUEUE = 'default'
#交换机的作用
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
# tasks.add will go with the queue: tasks_queue for both producer and consumer
CELERY_ROUTES = {
'tasks.add': {'queue': 'tasks_queue', 'routing_key': 'tasks_routing'},
'tasks.mul': {'queue': 'tasks_queue', 'routing_key': 'tasks_routing'},
#'tasks.add_1': {'queue': 'tasks_queue', 'routing_key': 'tasks_routing_1'},
#'tasks.add_2': {'queue': 'tasks_queue_2', 'routing_key': 'tasks_routing_2'},
'tasks.pdf': {'queue': 'tasks_pdf_queue', 'routing_key': 'tasks_pdf_routing'},
}
# define exchanges explicitly, change type here requires reset queue/exchange: 'celery amqp queue.delete tasks_queue' and 'celery amqp exchange.delete tasks_exchange'
tasks_exchange = Exchange('tasks_exchange', type='direct') # fanout/direct/topic
# For tasks.py to listen the queue: tasks_queue
CELERY_QUEUES = (
Queue('tasks_queue', tasks_exchange, routing_key='tasks_routing'),
# Queue('tasks_queue', tasks_exchange, routing_key='tasks_routing_1'),
# Queue('tasks_queue_2', tasks_exchange, routing_key='tasks_routing_2'),
# routing_key could be 'tasks.#', '*.tasks.*' if exchange type is 'topic'
Queue('tasks_pdf_queue', tasks_exchange, routing_key='tasks_pdf_routing'),
)
# acknowledged after the task has been executed, False by default
CELERY_ACKS_LATE = True
# The worker will reserve at most one extra task for every active worker process.
# CELERYD_PREFETCH_MULTIPLIER = 1
#
# Good for many tasks with a long duration, larger this number for many short-running tasks, if there are tasks with the combination of long/short-running tasks, go with two queues and using dedicated workers with different MULTIPLIER values to consume the each queue.
# 'pickle', 'json', 'yaml', 'msgpack' are supported for serialization/unserialization
CELERY_TASK_SERIALIZER = 'pickle'
)
self.mitigation_exchange = Exchange(
"mitigation", type="direct", durable=False, delivery_mode=1
)
# QUEUES
self.update_queue = Queue(
"db-bgp-update",
exchange=self.update_exchange,
routing_key="update",
durable=False,
auto_delete=True,
max_priority=1,
consumer_arguments={"x-priority": 1},
)
self.withdraw_queue = Queue(
"db-withdraw-update",
exchange=self.update_exchange,
routing_key="withdraw",
durable=False,
auto_delete=True,
max_priority=1,
consumer_arguments={"x-priority": 1},
)
self.hijack_queue = Queue(
"db-hijack-update-{}".format(uuid()),
exchange=self.hijack_hashing,
routing_key="1",
durable=False,
auto_delete=True,
max_priority=1,
def bind_to(self, exchange, routing_key):
if not self.queue:
self.queue = Queue(
name=self.queue_name,
channel=self.connection.channel(),
exchange=exchange,
routing_key=routing_key,
durable=True,
auto_delete=False,
)
self.consumers.append(dict(queues=self.queue,
callbacks=[self.on_message]))
# just in case the queue does not already exist on Pulse
self.queue.declare()
else:
self.queue.bind_to(exchange=exchange, routing_key=routing_key)
# get the binding key for this consumer
binding = self.get_binding_str(exchange.name, routing_key)
def get_reply_queue(msgid, channel=None):
exchange = get_reply_exchange(msgid, channel=channel)
return Queue(
name=msgid,
channel=channel,
exchange=exchange,
routing_key=msgid,
durable=False,
auto_delete=True,
exclusive=True)