How to use the dbnd._core.configuration.dbnd_config.config.get function in dbnd

To help you get started, we’ve selected a few dbnd examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github databand-ai / dbnd / modules / dbnd / test_dbnd / configuration / test_config_layers.py View on Github external
def test_str_interpolation(self):
        with config(
            {
                "b": dict(
                    a="@python://%s" % "test_dbnd.configuration.test_config_layers._a"
                )
            }
        ):
            assert config.get("b", "a") == "from_a"
github databand-ai / dbnd / modules / dbnd-airflow / src / dbnd_airflow / airflow_extensions / airflow_config.py View on Github external
def set_airflow_sql_conn_from_dbnd_config():
    logging.debug("updating airflow config from dbnd config")
    from dbnd._core.configuration.dbnd_config import config as dbnd_config

    sql_alchemy_conn = dbnd_config.get("airflow", "sql_alchemy_conn")
    if sql_alchemy_conn == "dbnd":
        logging.debug("updating airflow sql from dbnd core.sql_alchemy_conn")
        sql_alchemy_conn = dbnd_config.get("core", "sql_alchemy_conn")

    if sql_alchemy_conn and "AIRFLOW__CORE__SQL_ALCHEMY_CONN" not in os.environ:
        os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = sql_alchemy_conn

    fernet_key = dbnd_config.get("airflow", "fernet_key")
    if fernet_key == "dbnd":
        fernet_key = dbnd_config.get("core", "fernet_key")
    if fernet_key and "AIRFLOW__CORE__FERNET_KEY" not in os.environ:
        os.environ["AIRFLOW__CORE__FERNET_KEY"] = fernet_key
github databand-ai / dbnd / modules / dbnd-airflow / src / dbnd_airflow / airflow_extensions / airflow_config.py View on Github external
def set_airflow_sql_conn_from_dbnd_config():
    logging.debug("updating airflow config from dbnd config")
    from dbnd._core.configuration.dbnd_config import config as dbnd_config

    sql_alchemy_conn = dbnd_config.get("airflow", "sql_alchemy_conn")
    if sql_alchemy_conn == "dbnd":
        logging.debug("updating airflow sql from dbnd core.sql_alchemy_conn")
        sql_alchemy_conn = dbnd_config.get("core", "sql_alchemy_conn")

    if sql_alchemy_conn and "AIRFLOW__CORE__SQL_ALCHEMY_CONN" not in os.environ:
        os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = sql_alchemy_conn

    fernet_key = dbnd_config.get("airflow", "fernet_key")
    if fernet_key == "dbnd":
        fernet_key = dbnd_config.get("core", "fernet_key")
    if fernet_key and "AIRFLOW__CORE__FERNET_KEY" not in os.environ:
        os.environ["AIRFLOW__CORE__FERNET_KEY"] = fernet_key
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / context / databand_context.py View on Github external
def _on_enter(self):
        pm.hook.dbnd_on_pre_init_context(ctx=self)
        run_user_func(config.get("core", "user_pre_init"))
        # if we are deserialized - we don't need to run this code again.
        if not self.initialized_context:
            # noinspection PyTypeChecker
            if self._module:
                load_python_module(self._module, "--module")

            module_from_config = config.get("databand", "module")
            if self._autoload_modules and module_from_config:
                load_python_module(
                    module_from_config, "config file (see [databand].module)"
                )

            # will be called from singleton context manager
            # we want to be able to catch all "new" inline airflow operators
            self.system_settings = DatabandSystemConfig()
            if self.system_settings.conf:
                self.config.set_values(self.system_settings.conf, source="dbnd.conf")
            if self.system_settings.conf_file:
                conf_file = read_from_config_files(self.system_settings.conf_file)
                self.config.set_values(conf_file, source="dbnd__databand__conf")

            from dbnd._core.settings import DatabandSettings
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / task_build / task_registry.py View on Github external
def _get_task_cls(self, task_name):
        from dbnd._core.utils.basics.load_python_module import load_python_module

        task_cls = self._get_registered_task_cls(task_name)
        if task_cls:
            return task_cls

        # we are going to check if we have override/definition in config
        config_task_type = config.get(task_name, "_type", None)
        if config_task_type:
            _validate_no_recursion_in_config(task_name, config_task_type, "_type")
            try:
                return self._get_task_cls(config_task_type)
            except Exception:
                logger.error(
                    "Failed to load type required by [%s] using _type=%s",
                    task_name,
                    config_task_type,
                )
                raise
        config_task_type = config.get(task_name, "_from", None)
        if config_task_type:
            _validate_no_recursion_in_config(task_name, config_task_type, "_from")
            return self._get_task_cls(config_task_type)