Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pformat_table_current_config(self):
with config(
_tc("test_log_current_config_abc", "test_log_current_config_value"),
source="test_source",
):
actual = pformat_current_config(
config, as_table=True, sections=["test_section"]
)
assert "test_log_current_config_abc" in actual
assert "test_log_current_config_value" in actual
assert "test_source" in actual
def test_log_current_config(self):
with config(
_tc("test_log_current_config_abc", "test_log_current_config_value"),
source="test_source",
):
config.log_current_config()
return task_cls
# we are going to check if we have override/definition in config
config_task_type = config.get(task_name, "_type", None)
if config_task_type:
_validate_no_recursion_in_config(task_name, config_task_type, "_type")
try:
return self._get_task_cls(config_task_type)
except Exception:
logger.error(
"Failed to load type required by [%s] using _type=%s",
task_name,
config_task_type,
)
raise
config_task_type = config.get(task_name, "_from", None)
if config_task_type:
_validate_no_recursion_in_config(task_name, config_task_type, "_from")
return self._get_task_cls(config_task_type)
if "." in task_name:
parts = task_name.split(".")
possible_root_task = parts.pop()
possible_module = ".".join(parts)
# Try to load module and check again for existance
load_python_module(possible_module, "task name '%s'" % task_name)
task_cls = self._get_registered_task_cls(task_name)
if task_cls:
return task_cls
def __init__(self, config_file=None):
config.load_system_configs()
self.config_file = (
config_file if config_file else config.get("scheduler", "config_file")
)
self.active_by_default = config.get("scheduler", "active_by_default")
def __init__(self, config_file=None):
config.load_system_configs()
self.config_file = (
config_file if config_file else config.get("scheduler", "config_file")
)
self.active_by_default = config.get("scheduler", "active_by_default")
def initialize_task_id(self, params=None):
name = self.task_name
extra = {}
if config.getboolean("task_build", "sign_with_full_qualified_name"):
extra["full_task_family"] = self.task_definition.full_task_family
if config.getboolean("task_build", "sign_with_task_code"):
extra["task_code_hash"] = user_friendly_signature(
self.task_definition.task_source_code
)
signature = build_signature(name=name, params=params, extra=extra)
self.task_id, self.task_signature = (signature.id, signature.signature)
self.task_signature_source = signature.signature_source
def use_airflow_connections():
from dbnd._core.configuration.dbnd_config import config
return is_airflow_enabled() and config.getboolean("airflow", "use_connections")
def enable_osx_forked_request_calls():
if not is_osx:
return
from dbnd._core.configuration.dbnd_config import config
if not config.getboolean("core", "fix_env_on_osx"):
return
if "no_proxy" not in os.environ:
os.environ["no_proxy"] = "*"
if "OBJC_DISABLE_INITIALIZE_FORK_SAFETY" not in os.environ:
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "yes"
def set_airflow_sql_conn_from_dbnd_config():
logging.debug("updating airflow config from dbnd config")
from dbnd._core.configuration.dbnd_config import config as dbnd_config
sql_alchemy_conn = dbnd_config.get("airflow", "sql_alchemy_conn")
if sql_alchemy_conn == "dbnd":
logging.debug("updating airflow sql from dbnd core.sql_alchemy_conn")
sql_alchemy_conn = dbnd_config.get("core", "sql_alchemy_conn")
if sql_alchemy_conn and "AIRFLOW__CORE__SQL_ALCHEMY_CONN" not in os.environ:
os.environ["AIRFLOW__CORE__SQL_ALCHEMY_CONN"] = sql_alchemy_conn
fernet_key = dbnd_config.get("airflow", "fernet_key")
if fernet_key == "dbnd":
fernet_key = dbnd_config.get("core", "fernet_key")
if fernet_key and "AIRFLOW__CORE__FERNET_KEY" not in os.environ:
os.environ["AIRFLOW__CORE__FERNET_KEY"] = fernet_key
def new_dbnd_context(conf=None, name=None, **kwargs):
# type: (...) -> ContextManager[DatabandContext]
with config(config_values=conf, source="inplace"):
with DatabandContext.new_context(
name=name, allow_override=True, **kwargs
) as dc:
yield dc