How to use the dbnd._core.parameter.parameter_builder.parameter function in dbnd

To help you get started, we’ve selected a few dbnd examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / settings / env.py View on Github external
task_env_param = parameter(scope=ParameterScope.children)


class EnvConfig(Config):
    """Databand's environment configuration"""

    _conf__task_family = "env"
    cloud_type = parameter(description="cloud type: gcp/aws/")[str]

    env_label = parameter(
        default=EnvLabel.dev, description="environment type: dev/int/prod"
    )[
        str
    ]  # label

    production = parameter(
        description="indicates that environment is production"
    ).value(False)

    conn_id = parameter(default=None, description="cloud connection settings")[str]

    # MAIN OUTPUT FOLDER
    root = parameter(description="Data outputs location").folder[DirTarget]

    # DATABAND SYSTEM FOLDERS
    dbnd_root = parameter(description="DBND system outputs location").output.folder(
        default=None
    )[DirTarget]
    dbnd_local_root = parameter(
        description="DBND home for the local engine environment"
    ).output.folder()[DirTarget]
    dbnd_data_sync_root = parameter(
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / settings / run.py View on Github external
"  see docs for more options",
    )[str]

    submit_driver = parameter(
        description="override env.submit_driver for specific environment"
    ).none[bool]
    submit_tasks = parameter(
        description="override env.submit_tasks for specific environment"
    ).none[bool]

    enable_airflow_kubernetes = parameter(
        default=True,
        description="Enable use of kubernetes executor for kubebernetes engine submission",
    )[bool]

    execution_date = parameter(default=None, description="Override execution date")[
        datetime
    ]

    # Execution specific
    id = parameter(default=None, description="The list of task ids to run")[List[str]]
    task = parameter(
        default=None, description="Run only specified tasks (regular expresion)"
    )[str]

    ignore_dependencies = parameter(
        description="The regex to filter specific task_ids"
    ).value(False)
    ignore_first_depends_on_past = parameter(
        description="The regex to filter specific task_ids"
    ).value(False)
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / run / databand_run.py View on Github external
class _DbndDriverTask(Task):
    _conf__no_child_params = True
    task_is_system = True
    task_in_memory_outputs = True

    is_driver = parameter[bool]
    is_submitter = parameter[bool]
    execution_date = parameter[datetime]
    sends_heartbeat = parameter[bool]

    host_engine = parameter[EngineConfig]
    target_engine = parameter[EngineConfig]

    task_executor_type = parameter[str]

    # all paths, we make them system, we don't want to check if they are exists
    local_driver_root = output(system=True)[Target]
    local_driver_log = output(system=True)[Target]

    remote_driver_root = output(system=True)[Target]
    driver_dump = output(system=True)[Target]

    def _build_submit_task(self, run):
        if run.root_task:
            raise DatabandRuntimeError(
                "Can't send to remote execution task created via code, only command line is supported"
            )

        # dont' describe in local run, do it in remote run
        settings = self.settings
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / settings / engine.py View on Github external
)
from dbnd._core.parameter.parameter_builder import parameter
from dbnd._core.task import config
from targets import DirTarget


if typing.TYPE_CHECKING:
    from dbnd._core.run.databand_run import DatabandRun

logger = logging.getLogger(__name__)


class EngineConfig(config.Config):
    """Databand's engine configuration (where tasks are executed)"""

    require_submit = parameter(
        description="Should the task engine be forced to submit tasks"
    ).value(False)

    dbnd_local_root = parameter(
        default=None, description="Local dbnd home directory at the engine environment"
    )[DirTarget]

    dbnd_executable = parameter(
        default=[sys.executable, "-m", "dbnd"],
        description="'dbnd' executable path at engine environment",
    )[typing.List[str]]

    def cleanup_after_run(self):
        pass

    def submit_to_engine_task(self, env, task_name, args, interactive=True):
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / settings / log.py View on Github external
_conf__task_family = "log"
    disabled = parameter(description="Should logging be disabled").value(False)
    capture_stdout_stderr = parameter(
        description="Should logger retransmit all output wrtten to stdout\stderr"
    ).value(True)
    capture_task_run_log = parameter.help("Capture task output into log").value(True)

    override_airflow_logging_on_task_run = parameter(
        description="Replace airflow logger with databand logger"
    ).value(True)
    support_jupiter = parameter(
        description="Support logging output to Jupiter UI"
    ).value(True)

    level = parameter(description="Logging level. DEBUG\INFO\WARN\ERROR").value("INFO")
    formatter = parameter(
        description="Log formatting string (logging library convention)"
    )[str]
    formatter_colorlog = parameter(
        description="Log formatting string (logging library convention)"
    )[str]
    formatter_simple = parameter(
        description="Log formatting string (logging library convention)"
    )[str]

    console_formatter_name = parameter(
        description="The name of the formatter logging to console output"
    )[str]
    file_formatter_name = parameter(
        description="The name of the formatter logging to file output"
    )[str]
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / inplace_run / airflow_dag_inplace_tracking.py View on Github external
dag_id=dag_id,
                execution_date=execution_date,
                task_id=task_id,
                try_number=try_number,
            )
    except Exception as ex:
        logger.info("Failed to get airlfow context info from spark job: %s", ex)

    return None


class AirflowOperatorRuntimeTask(Task):
    task_is_system = False
    _conf__track_source_code = False

    dag_id = parameter[str]
    execution_date = parameter[datetime.datetime]

    def _initialize(self):
        super(AirflowOperatorRuntimeTask, self)._initialize()
        self.task_meta.task_functional_call = ""

        self.task_meta.task_command_line = generate_airflow_cmd(
            dag_id=self.dag_id,
            task_id=self.task_id,
            execution_date=self.execution_date,
            is_root_task=False,
        )

    @classmethod
    def build_from_airflow_context(self, af_context):
        # we can't actually run it, we even don't know when it's going to finish
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / task / base_task.py View on Github external
_conf_auto_read_params = True  # enables autoread of params.
    _conf_confirm_on_kill_msg = None  # get user confirmation on task kill if not empty
    _conf__require_run_dump_file = False

    # this is the state of autoread
    _task_auto_read_original = None
    _task_auto_read = None
    ####
    # execution
    # will be used by Registry
    task_definition = None  # type: TaskDefinition

    # user can override this with his configuration
    defaults = None  # type: Dict[ParameterDefinition, any()]

    validate_no_extra_params = parameter.enum(ParamValidation).system(
        description="validate that all configured keys for a task have a matching parameter definition"
    )

    @classmethod
    def get_task_family(cls):
        return cls.task_definition.task_family

    def __init__(self, **kwargs):
        super(_BaseTask, self).__init__()

        # most of the time we will use it as TaskMetaCtrl - we want this to be type hint!
        self.task_meta = kwargs["task_meta"]  # type: TaskMeta
        self._params = TaskParameters(self)

        for p_value in self.task_meta.task_params:
            setattr(self, p_value.name, p_value.value)
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / settings / log.py View on Github external
)[bool]

    send_body_to_server_max_size = parameter(
        default=16 * 1024 * 1024,  # 16MB
        description="Max log file size in bytes to be sent to server.\n"
        "\t* use 0 for unlimited;"
        "\t* use -1 to disable;"
        "\t* use negative (e.g. -1000) to get log's 'head' instead of 'tail'."
        "Default: 16MB.",
    )[int]

    remote_logging_disabled = parameter.help(
        "for tasks using a cloud environment, don't copy the task log to cloud storage"
    ).value(False)

    sqlalchemy_print = parameter(description="enable sqlalchemy logger").value(False)
    sqlalchemy_trace = parameter(description="trace sqlalchemy queries").value(False)
    api_profile = parameter(description="profile api calls").value(False)

    def _initialize(self):
        super(LoggingConfig, self)._initialize()
        self.task_log_file_formatter = None

    def format_exception_as_str(self, exc_info, isolate=True):
        if self.exception_simple:
            return format_exception_as_str(exc_info)

        try:
            tbvaccine = TBVaccine(
                no_colors=self.exception_no_color,
                show_vars=False,
                skip_non_user_on_isolate=True,
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / settings / env.py View on Github external
"""Databand's environment configuration"""

    _conf__task_family = "env"
    cloud_type = parameter(description="cloud type: gcp/aws/")[str]

    env_label = parameter(
        default=EnvLabel.dev, description="environment type: dev/int/prod"
    )[
        str
    ]  # label

    production = parameter(
        description="indicates that environment is production"
    ).value(False)

    conn_id = parameter(default=None, description="cloud connection settings")[str]

    # MAIN OUTPUT FOLDER
    root = parameter(description="Data outputs location").folder[DirTarget]

    # DATABAND SYSTEM FOLDERS
    dbnd_root = parameter(description="DBND system outputs location").output.folder(
        default=None
    )[DirTarget]
    dbnd_local_root = parameter(
        description="DBND home for the local engine environment"
    ).output.folder()[DirTarget]
    dbnd_data_sync_root = parameter(
        description="Rooted directory for target syncing against remote engine"
    ).output.folder()[DirTarget]

    # execution
github databand-ai / dbnd / modules / dbnd / src / dbnd / _core / task / task.py View on Github external
Note that setting this value with ``@property`` will not work, because this
        is a class level value.
    """

    _task_band_result = output(default=None, system=True)
    _meta_output = output(
        system=True,
        output_name="meta",
        output_ext="",
        target_config=folder,
        significant=False,
        description="Location of all internal outputs (e.g. metrics)",
    )
    task_band = output.json(output_name="band")

    task_enabled = parameter.system(scope=ParameterScope.children)[bool]
    task_enabled_in_prod = parameter.system(scope=ParameterScope.children)[bool]

    # for permanent bump of task version use Task.task_class_version
    task_version = parameter(
        description="task version, directly affects task signature ",
        scope=ParameterScope.children,
    )[VersionStr]

    task_class_version = parameter.value(
        default=DEFAULT_CLASS_VERSION,
        system=True,
        description="task code version, "
        "use while you want persistent change in your task version",
    )

    task_env = parameter.value(