Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
publish = taskcluster_config.secrets["PHABRICATOR"].get("publish", False)
# Check the redis support is enabled on Heroku
if heroku.in_dyno():
assert self.bus.redis_enabled is True, "Need Redis on Heroku"
community_config = taskcluster_config.secrets.get("taskcluster_community")
test_selection_enabled = taskcluster_config.secrets.get(
"test_selection_enabled", False
)
# Run webserver & pulse on web dyno or single instance
if not heroku.in_dyno() or heroku.in_web_dyno():
# Create web server
self.webserver = WebServer(QUEUE_WEB_BUILDS)
self.webserver.register(self.bus)
# Create pulse listener
exchanges = {}
if taskcluster_config.secrets["autoland_enabled"]:
logger.info("Autoland ingestion is enabled")
# autoland ingestion
exchanges[QUEUE_PULSE_AUTOLAND] = [
(PULSE_TASK_GROUP_RESOLVED, ["#.gecko-level-3.#"])
]
# Create pulse listeners for bugbug test selection task and unit test failures.
if community_config is not None and test_selection_enabled:
exchanges[QUEUE_PULSE_TRY_TASK_END] = [
(PULSE_TASK_COMPLETED, ["#.gecko-level-1.#"]),
(PULSE_TASK_FAILED, ["#.gecko-level-1.#"]),