Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_str_interpolation(self):
with config(
{
"b": dict(
a="@python://%s" % "test_dbnd.configuration.test_config_layers._a"
)
}
):
assert config.get("b", "a") == "from_a"
def set_airflow_sql_conn_from_dbnd_config():
logging.debug("updating airflow config from dbnd config")
from dbnd._core.configuration.dbnd_config import config as dbnd_config
sql_alchemy_conn = dbnd_config.get("airflow", "sql_alchemy_conn")
if sql_alchemy_conn == "dbnd":
logging.debug("updating airflow sql from dbnd core.sql_alchemy_conn")
sql_alchemy_conn = dbnd_config.get("core", "sql_alchemy_conn")
if sql_alchemy_conn and "AIRFLOW__CORE__SQL_ALCHEMY_CONN" not in os.environ:
os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = sql_alchemy_conn
fernet_key = dbnd_config.get("airflow", "fernet_key")
if fernet_key == "dbnd":
fernet_key = dbnd_config.get("core", "fernet_key")
if fernet_key and "AIRFLOW__CORE__FERNET_KEY" not in os.environ:
os.environ["AIRFLOW__CORE__FERNET_KEY"] = fernet_key
def set_airflow_sql_conn_from_dbnd_config():
logging.debug("updating airflow config from dbnd config")
from dbnd._core.configuration.dbnd_config import config as dbnd_config
sql_alchemy_conn = dbnd_config.get("airflow", "sql_alchemy_conn")
if sql_alchemy_conn == "dbnd":
logging.debug("updating airflow sql from dbnd core.sql_alchemy_conn")
sql_alchemy_conn = dbnd_config.get("core", "sql_alchemy_conn")
if sql_alchemy_conn and "AIRFLOW__CORE__SQL_ALCHEMY_CONN" not in os.environ:
os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = sql_alchemy_conn
fernet_key = dbnd_config.get("airflow", "fernet_key")
if fernet_key == "dbnd":
fernet_key = dbnd_config.get("core", "fernet_key")
if fernet_key and "AIRFLOW__CORE__FERNET_KEY" not in os.environ:
os.environ["AIRFLOW__CORE__FERNET_KEY"] = fernet_key
def _on_enter(self):
pm.hook.dbnd_on_pre_init_context(ctx=self)
run_user_func(config.get("core", "user_pre_init"))
# if we are deserialized - we don't need to run this code again.
if not self.initialized_context:
# noinspection PyTypeChecker
if self._module:
load_python_module(self._module, "--module")
module_from_config = config.get("databand", "module")
if self._autoload_modules and module_from_config:
load_python_module(
module_from_config, "config file (see [databand].module)"
)
# will be called from singleton context manager
# we want to be able to catch all "new" inline airflow operators
self.system_settings = DatabandSystemConfig()
if self.system_settings.conf:
self.config.set_values(self.system_settings.conf, source="dbnd.conf")
if self.system_settings.conf_file:
conf_file = read_from_config_files(self.system_settings.conf_file)
self.config.set_values(conf_file, source="dbnd__databand__conf")
from dbnd._core.settings import DatabandSettings
def _get_task_cls(self, task_name):
from dbnd._core.utils.basics.load_python_module import load_python_module
task_cls = self._get_registered_task_cls(task_name)
if task_cls:
return task_cls
# we are going to check if we have override/definition in config
config_task_type = config.get(task_name, "_type", None)
if config_task_type:
_validate_no_recursion_in_config(task_name, config_task_type, "_type")
try:
return self._get_task_cls(config_task_type)
except Exception:
logger.error(
"Failed to load type required by [%s] using _type=%s",
task_name,
config_task_type,
)
raise
config_task_type = config.get(task_name, "_from", None)
if config_task_type:
_validate_no_recursion_in_config(task_name, config_task_type, "_from")
return self._get_task_cls(config_task_type)