How to use the dbnd._core.configuration.dbnd_config.config function in dbnd

To help you get started, we’ve selected a few dbnd examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github databand-ai / dbnd / modules / dbnd / test_dbnd / configuration / test_config_layers.py View on Github external
def test_pformat_table_current_config(self):
        with config(
            _tc("test_log_current_config_abc", "test_log_current_config_value"),
            source="test_source",
        ):
            actual = pformat_current_config(
                config, as_table=True, sections=["test_section"]
            )
            assert "test_log_current_config_abc" in actual
            assert "test_log_current_config_value" in actual
            assert "test_source" in actual
github databand-ai / dbnd / modules / dbnd / test_dbnd / configuration / test_config_layers.py View on Github external
def test_log_current_config(self):
        with config(
            _tc("test_log_current_config_abc", "test_log_current_config_value"),
            source="test_source",
        ):
            config.log_current_config()
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / task_build / task_registry.py View on Github external
return task_cls

        # we are going to check if we have override/definition in config
        config_task_type = config.get(task_name, "_type", None)
        if config_task_type:
            _validate_no_recursion_in_config(task_name, config_task_type, "_type")
            try:
                return self._get_task_cls(config_task_type)
            except Exception:
                logger.error(
                    "Failed to load type required by [%s] using _type=%s",
                    task_name,
                    config_task_type,
                )
                raise
        config_task_type = config.get(task_name, "_from", None)
        if config_task_type:
            _validate_no_recursion_in_config(task_name, config_task_type, "_from")
            return self._get_task_cls(config_task_type)

        if "." in task_name:
            parts = task_name.split(".")
            possible_root_task = parts.pop()
            possible_module = ".".join(parts)

            # Try to load module and check again for existance
            load_python_module(possible_module, "task name '%s'" % task_name)

            task_cls = self._get_registered_task_cls(task_name)
            if task_cls:
                return task_cls
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / configuration / scheduler_file_config_loader.py View on Github external
def __init__(self, config_file=None):
        config.load_system_configs()
        self.config_file = (
            config_file if config_file else config.get("scheduler", "config_file")
        )
        self.active_by_default = config.get("scheduler", "active_by_default")
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / configuration / scheduler_file_config_loader.py View on Github external
def __init__(self, config_file=None):
        config.load_system_configs()
        self.config_file = (
            config_file if config_file else config.get("scheduler", "config_file")
        )
        self.active_by_default = config.get("scheduler", "active_by_default")
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / task_ctrl / task_meta.py View on Github external
def initialize_task_id(self, params=None):
        name = self.task_name
        extra = {}
        if config.getboolean("task_build", "sign_with_full_qualified_name"):
            extra["full_task_family"] = self.task_definition.full_task_family
        if config.getboolean("task_build", "sign_with_task_code"):
            extra["task_code_hash"] = user_friendly_signature(
                self.task_definition.task_source_code
            )

        signature = build_signature(name=name, params=params, extra=extra)
        self.task_id, self.task_signature = (signature.id, signature.signature)

        self.task_signature_source = signature.signature_source
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / plugin / dbnd_plugins.py View on Github external
def use_airflow_connections():
    from dbnd._core.configuration.dbnd_config import config

    return is_airflow_enabled() and config.getboolean("airflow", "use_connections")
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / utils / platform / osx_compatible / requests_in_forked_process.py View on Github external
def enable_osx_forked_request_calls():
    if not is_osx:
        return

    from dbnd._core.configuration.dbnd_config import config

    if not config.getboolean("core", "fix_env_on_osx"):
        return

    if "no_proxy" not in os.environ:
        os.environ["no_proxy"] = "*"

    if "OBJC_DISABLE_INITIALIZE_FORK_SAFETY" not in os.environ:
        os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "yes"
github databand-ai / dbnd / modules / dbnd-airflow / src / dbnd_airflow / bootstrap.py View on Github external
def set_airflow_sql_conn_from_dbnd_config():
    logging.debug("updating airflow config from dbnd config")
    from dbnd._core.configuration.dbnd_config import config as dbnd_config

    sql_alchemy_conn = dbnd_config.get("airflow", "sql_alchemy_conn")
    if sql_alchemy_conn == "dbnd":
        logging.debug("updating airflow sql from dbnd core.sql_alchemy_conn")
        sql_alchemy_conn = dbnd_config.get("core", "sql_alchemy_conn")

    if sql_alchemy_conn and "AIRFLOW__CORE__SQL_ALCHEMY_CONN" not in os.environ:
        os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = sql_alchemy_conn

    fernet_key = dbnd_config.get("airflow", "fernet_key")
    if fernet_key == "dbnd":
        fernet_key = dbnd_config.get("core", "fernet_key")
    if fernet_key and "AIRFLOW__CORE__FERNET_KEY" not in os.environ:
        os.environ["AIRFLOW__CORE__FERNET_KEY"] = fernet_key
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / context / databand_context.py View on Github external
def new_dbnd_context(conf=None, name=None, **kwargs):
    # type: (...) -> ContextManager[DatabandContext]

    with config(config_values=conf, source="inplace"):
        with DatabandContext.new_context(
            name=name, allow_override=True, **kwargs
        ) as dc:
            yield dc