How to use the kedro.io.LambdaDataSet function in kedro

To help you get started, we’ve selected a few kedro examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github quantumblacklabs / kedro / tests / io / test_lambda_data_set.py View on Github external
def test_release_not_implemented(self):
        """Check that `release` does nothing by default"""
        data_set = LambdaDataSet(None, None)
        data_set.release()
github quantumblacklabs / kedro / tests / io / test_lambda_data_set.py View on Github external
def test_save_not_callable(self):
        pattern = (
            r"`save` function for LambdaDataSet must be a Callable\. "
            r"Object of type `str` provided instead\."
        )
        with pytest.raises(DataSetError, match=pattern):
            LambdaDataSet(None, "save")
github quantumblacklabs / kedro-airflow / tests / test_runner.py View on Github external
first_task: first_op,
        middle_task: middle_op,
        last_task: last_op,
    }[python_callable]

    def operator_arguments(task_id):
        args = {"lambda-none-a": {"retries": 1}, "lambda-b-none": {"retries": 2}}
        return args.get(task_id, {})

    # actually call the runner to do the conversion
    dag = Mock()
    pipeline = Pipeline([first_node, last_node, middle_node])
    catalog = DataCatalog(
        {
            "a": LambdaDataSet(load=None, save=None),
            "b": LambdaDataSet(load=None, save=None),
        }
    )
    AirflowRunner(dag, None, operator_arguments).run(pipeline, catalog)

    # check the create task calls
    create_task.assert_has_calls(
        [
            call(first_node, catalog),
            call(middle_node, catalog),
            call(last_node, catalog),
        ],
        any_order=True,
    )

    # check the operator constructor calls
    operator.assert_has_calls(
github quantumblacklabs / kedro / tests / pipeline / test_node_run.py View on Github external
def mocked_dataset(mocker):
    load = mocker.Mock(return_value=42)
    save = mocker.Mock()
    return LambdaDataSet(load, save)
github quantumblacklabs / kedro / tests / io / test_lambda_data_set.py View on Github external
def test_release_invocation(self, mocker):
        """Test the basic `release` method invocation"""
        mocked_release = mocker.Mock()
        data_set = LambdaDataSet(None, None, None, mocked_release)
        data_set.release()
        mocked_release.assert_called_once_with()
github quantumblacklabs / kedro / tests / io / test_data_catalog.py View on Github external
def test_exists_not_implemented(self, caplog):
        """Test calling `exists` on the data set, which didn't implement it"""
        catalog = DataCatalog(data_sets={"test": LambdaDataSet(None, None)})
        result = catalog.exists("test")

        log_record = caplog.records[0]
        assert log_record.levelname == "WARNING"
        assert (
            "`exists()` not implemented for `LambdaDataSet`. "
            "Assuming output does not exist." in log_record.message
        )
        assert result is False
github quantumblacklabs / kedro / tests / io / test_lambda_data_set.py View on Github external
pass  # pragma: no cover

    def _dummy_save():
        pass  # pragma: no cover

    def _dummy_exists():
        return False  # pragma: no cover

    def _dummy_release():
        pass  # pragma: no cover

    assert "LambdaDataSet(load=)" in str(
        LambdaDataSet(_dummy_load, None)
    )
    assert "LambdaDataSet(save=)" in str(
        LambdaDataSet(None, _dummy_save)
    )
    assert "LambdaDataSet(exists=)" in str(
        LambdaDataSet(None, None, _dummy_exists)
    )
    assert (
        "LambdaDataSet(release=)"
        in str(LambdaDataSet(None, None, None, _dummy_release))
    )

    # __init__ keys alphabetically sorted, None values not shown
    expected = (
        "LambdaDataSet(exists=, "
        "load=, "
        "save=)"
    )
    actual = str(LambdaDataSet(_dummy_load, _dummy_save, _dummy_exists, None))
github quantumblacklabs / kedro / tests / io / test_lambda_data_set.py View on Github external
def test_exists_invocation(self, mocker):
        """Test the basic `exists` method invocation"""
        mocked_exists = mocker.Mock(return_value=True)
        data_set = LambdaDataSet(None, None, mocked_exists)
        result = data_set.exists()
        mocked_exists.assert_called_once_with()
        assert result is True
github quantumblacklabs / kedro / features / steps / io_core_steps.py View on Github external
def prepare_missing_csv(context):
    sample_csv = "/var/missing_csv_file.csv"
    context.csv_data_set = LambdaDataSet(
        load=lambda: pd.read_csv(sample_csv), save=None
    )
github quantumblacklabs / kedro / features / steps / io_core_steps.py View on Github external
def data_set_with_no_save(context):
    context.csv_data_set = LambdaDataSet(load=None, save=None)