How to use the dbnd.parameter function in dbnd

To help you get started, we’ve selected a few dbnd examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github databand-ai / dbnd / modules / dbnd / test_dbnd / task / task_configuration / test_sub_config_override.py View on Github external
from dbnd import auto_namespace, band, parameter
from dbnd._core.constants import CloudType
from dbnd_test_scenarios.test_common.task.factories import FooConfig, TTask


auto_namespace(scope=__name__)


class FirstTask(TTask):
    # exists only for local
    foo = parameter(default="FooConfig")[FooConfig]
    param = parameter(default="FirstTask.inline.param")[str]


class SecondTask(FirstTask):
    defaults = {
        FooConfig.bar: "SecondTask.defaults.bar",
        FooConfig.quz: "SecondTask.defaults.quz",
    }


@band(defaults={FooConfig.bar: "first_pipeline.defaults.bar"})
def first_pipeline():
    return SecondTask(param="first_pipeline.band.param").t_output


@band(defaults={FooConfig.quz: "second_pipeline.defaults.quz"})
github databand-ai / dbnd / modules / dbnd / test_dbnd / task / test_task_object.py View on Github external
def test_task_caching(self):
        class DummyTask(Task):
            x = parameter[str]

        dummy_1 = DummyTask(x=1)
        dummy_2 = DummyTask(x=2)
        dummy_1b = DummyTask(x=1)

        assert dummy_1 != dummy_2
        assert dummy_1 == dummy_1b
github databand-ai / dbnd / plugins / dbnd-test-scenarios / src / dbnd_test_scenarios / test_common / task / factories.py View on Github external
def run(self):
        super(TTaskWithMetricsAndInput, self).run()


class FooBaseTask(TTask):
    """
    used by all command line checkers
    """

    pass


class FooConfig(Config):
    bar = parameter(default="from_config")[str]
    quz = parameter(default="from_config")[str]


class TConfig(Config):
    _conf__task_family = "tconfig"
    config_value_s1 = parameter[str]
    config_value_s2 = parameter[str]


@task
def ttask_simple(tparam="1"):
    # type:(str)->str
    return "result %s"


@task
def ttask_dataframe(tparam=1):
github databand-ai / dbnd / modules / dbnd / test_dbnd / task / parameters / test_task_parameters.py View on Github external
def test_params_default_none(self):
        class TDefaultNone(TTask):
            p_str = parameter(default=None)[str]
            p_str_optional = parameter(default=None)[Optional[str]]

        target_task = TDefaultNone()
        assert target_task.p_str is None
        assert target_task.p_str_optional is None
github databand-ai / dbnd / modules / dbnd / test_dbnd / task / parameters / test_parameter_bool.py View on Github external
from dbnd import dbnd_run_cmd, parameter
from dbnd_test_scenarios.test_common.task.factories import TTask


class Baz(TTask):
    bool = parameter(default=False)[bool]


class BazTrue(TTask):
    bool = parameter.value(True)


class TBoolWithDefault(TTask):
    x = parameter.value(default=True)


class TestTaskBoolParameters(object):
    def test_bool_false_default(self):
        result = dbnd_run_cmd(["Baz"])
        assert result.task.bool is False

    def test_bool_true(self):
github databand-ai / dbnd / plugins / dbnd-gcp / src / dbnd_gcp / apache_beam / apache_beam_task.py View on Github external
class _ApacheBeamInlineTask(_BeamTask, _DecoratedTask):
    _conf__require_run_dump_file = True

    dataflow_build_pipeline = parameter(
        system=True, description="Build Pipeline object if not set"
    ).value(True)

    dataflow_wait_until_finish = parameter(
        system=True, description="Automatically wait for pipeline run finish"
    ).value(True)

    dataflow_submit_dbnd_info = parameter(
        system=True, description="Add databand data to PipelineOptions"
    ).value(True)

    dataflow_pipeline = parameter(system=True, description="Dataflow Pipeline").value(
        None
    )[object]

    def _task_run(self):
        super(_ApacheBeamInlineTask, self)._task_run()

    def _task_options(self, python_pipeline_options):
        # We use the save_main_session option because one or more DoFn's in this
        # workflow rely on global context (e.g., a module imported at module level).
        from apache_beam.options.pipeline_options import PipelineOptions

        user_params = self._params.get_param_values(user_only=True)

        class TaskOptions(PipelineOptions):
            @classmethod
            def _add_argparse_args(cls, parser):
github databand-ai / dbnd / plugins / dbnd-hdfs / src / dbnd_hdfs / fs / hdfs_pyox.py View on Github external
class HdfsPyox(FileSystem, Config):
    _conf__task_family = "hdfs_knox"

    host = parameter.c(description="HDFS name node URL", default="localhost")[str]
    port = parameter.c(description="HDFS name node port", default=50070)[int]
    user = parameter.c(default="root")[str]
    base = parameter.c(
        default=None,
        description="Base url for Knox, mutually exclusive with HdfsConfig.host",
    )[str]
    secure = parameter.c(default=False)[bool]
    gateway = parameter.c(default=None)[str]
    password = parameter.c(default=None)[str]
    cookies = parameter.c(default=None)[str]
    bearer_token = parameter.c(default=None)[str]
    bearer_token_encode = parameter.c(default=True)[bool]

    @staticmethod
    def _remove_schema(path):
        if path.startswith(HDFS_SCHEMA):
            return path[7:]
        return path

    @property
    def client(self):  # type ()-> WebHDFS
        from pyox import WebHDFS

        return WebHDFS(
            host=self.host,
            port=self.port,
            username=self.user,
            base=self.base,
github databand-ai / dbnd / plugins / dbnd-docker / src / dbnd_docker / docker / docker_engine_config.py View on Github external
from dbnd_docker.container_engine_config import ContainerEngineConfig
from dbnd_docker.docker.docker_task import DockerRunTask


logger = logging.getLogger(__name__)


class DockerEngineConfig(ContainerEngineConfig):
    _conf__task_family = "docker"

    def get_docker_ctrl(self, task_run):
        from dbnd_docker.docker.docker_task_run_ctrl import LocalDockerRunCtrl

        return LocalDockerRunCtrl(task_run=task_run)

    network = parameter(default=None, description="Docker Network to connect to")[str]

    api_version = parameter(
        description="Remote API version. "
        "Set to ``auto`` to automatically detect the server's version.,"
    )[str]
    docker_url = parameter(
        description="URL of the host running the docker daemon."
    ).value("unix://var/run/docker.sock")

    auto_remove = parameter(
        description="Auto-removal of the container on daemon side when the container's process exits."
    ).value(False)
    force_pull = parameter(
        description="Pull the docker image on every run. Default is False."
    ).value(False)
github databand-ai / dbnd / plugins / dbnd-qubole / src / dbnd_qubole / qubole_config.py View on Github external
from dbnd_spark.spark_config import SparkEngineConfig


logger = logging.getLogger(__name__)


class Qubole(object):
    aws = "aws"


class QuboleConfig(SparkEngineConfig):
    """Databricks cloud for Apache Spark """

    _conf__task_family = "qubole"
    cluster_type = SparkClusters.qubole
    cloud = parameter(default="AWS", description="cloud")

    api_url = parameter(default="https://us.qubole.com/api").help(
        "API URL without version. like:'https://.qubole.com/api'"
    )[str]

    ui_url = parameter(default="https://api.qubole.com").help(
        "UI URL for accessing Qubole logs"
    )[str]

    api_token = parameter.help("API key of qubole account")[str]
    cluster_label = parameter().help("the label of the cluster to run the command on")[
        str
    ]

    status_polling_interval_seconds = parameter(default=10).help(
        "seconds to sleep between polling databricks for job status."