Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param include_password : Format the connection for ouput by setting this True
:param ssl : Flag to use rediss output
"""
server = CONFIG.results_backend.server
password_file = ""
urlbase = "rediss" if ssl else "redis"
try:
port = CONFIG.results_backend.port
except (KeyError, AttributeError):
port = 6379
LOG.debug(f"Results backend: redis using default port = {port}")
try:
db_num = CONFIG.results_backend.db_num
except (KeyError, AttributeError):
db_num = 0
LOG.debug(f"Results backend: redis using default db_num = {db_num}")
try:
username = CONFIG.results_backend.username
except (KeyError, AttributeError):
username = ""
try:
password_file = CONFIG.results_backend.password
try:
password = get_backend_password(password_file, certs_path=certs_path)
except IOError:
password = CONFIG.results_backend.password
:param include_password : The connection can be formatted for output by
setting this to True
"""
try:
return CONFIG.broker.url
except AttributeError:
pass
try:
broker = CONFIG.broker.name.lower()
except AttributeError:
broker = ""
try:
config_path = CONFIG.celery.certs
config_path = os.path.abspath(os.path.expanduser(config_path))
except AttributeError:
config_path = None
if broker not in BROKERS:
raise ValueError(f"Error: {broker} is not a supported broker.")
if broker == "rabbitmq" or broker == "amqps":
return get_rabbit_connection(config_path, include_password, conn="amqps")
elif broker == "amqp":
return get_rabbit_connection(config_path, include_password, conn="amqp")
elif broker == "redis+socket":
return get_redissock_connection(config_path, include_password)
# initialize app with essential properties
app = Celery(
"merlin",
broker=BROKER_URI,
backend=RESULTS_BACKEND_URI,
broker_use_ssl=broker_ssl,
redis_backend_use_ssl=results_ssl,
task_routes=(route_for_task,),
)
# load merlin config defaults
app.conf.update(**celeryconfig.DICT)
# load config overrides from app.yaml
if (
(not hasattr(CONFIG.celery, "override"))
or (CONFIG.celery.override is None)
or (len(nested_namespace_to_dicts(CONFIG.celery.override)) == 0)
):
LOG.debug("Skipping celery config override; 'celery.override' field is empty.")
else:
override_dict = nested_namespace_to_dicts(CONFIG.celery.override)
override_str = ""
i = 0
for k, v in override_dict.items():
if k not in str(app.conf.__dict__):
raise ValueError(f"'{k}' is not a celery configuration.")
override_str += f"\t{k}:\t{v}"
if i != len(override_dict) - 1:
override_str += "\n"
i += 1
LOG.info(
app = Celery(
"merlin",
broker=BROKER_URI,
backend=RESULTS_BACKEND_URI,
broker_use_ssl=broker_ssl,
redis_backend_use_ssl=results_ssl,
task_routes=(route_for_task,),
)
# load merlin config defaults
app.conf.update(**celeryconfig.DICT)
# load config overrides from app.yaml
if (
(not hasattr(CONFIG.celery, "override"))
or (CONFIG.celery.override is None)
or (len(nested_namespace_to_dicts(CONFIG.celery.override)) == 0)
):
LOG.debug("Skipping celery config override; 'celery.override' field is empty.")
else:
override_dict = nested_namespace_to_dicts(CONFIG.celery.override)
override_str = ""
i = 0
for k, v in override_dict.items():
if k not in str(app.conf.__dict__):
raise ValueError(f"'{k}' is not a celery configuration.")
override_str += f"\t{k}:\t{v}"
if i != len(override_dict) - 1:
override_str += "\n"
i += 1
LOG.info(
f"Overriding default celery config with 'celery.override' in 'app.yaml':\n{override_str}"
try:
return CONFIG.results_backend.url
except AttributeError:
pass
try:
backend = CONFIG.results_backend.name.lower()
except AttributeError:
backend = ""
if backend not in BACKENDS:
msg = f"'{backend}' is not a supported results backend"
raise ValueError(msg)
try:
certs_path = CONFIG.celery.certs
certs_path = os.path.abspath(os.path.expanduser(certs_path))
except AttributeError:
certs_path = None
if "mysql" in backend:
return get_mysql(certs_path=certs_path, include_password=include_password)
if "sqlite" in backend:
return SQLITE_CONNECTION_STRING
if backend == "redis":
return get_redis(certs_path=certs_path, include_password=include_password)
if backend == "rediss":
return get_redis(
certs_path=certs_path, include_password=include_password, ssl=True