How to use the kedro.io.PartitionedDataSet function in kedro

To help you get started, we’ve selected a few kedro examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github quantumblacklabs / kedro / tests / io / test_partitioned_dataset.py View on Github external
def test_describe(self, dataset):
        path = "s3://{}/foo/bar".format(BUCKET_NAME)
        pds = PartitionedDataSet(path, dataset)

        assert "path={}".format(path) in str(pds)
        assert "dataset_type=CSVS3DataSet" in str(pds)
        if isinstance(dataset, dict) and dataset.keys() - {"type"}:
            assert "dataset_config" in str(pds)
        else:
            assert "dataset_config" not in str(pds)
github quantumblacklabs / kedro / tests / io / test_partitioned_dataset.py View on Github external
def test_load(
        self, dataset, local_csvs, partitioned_data_pandas, suffix, expected_num_parts
    ):
        pds = PartitionedDataSet(str(local_csvs), dataset, filename_suffix=suffix)
        loaded_partitions = pds.load()

        assert len(loaded_partitions.keys()) == expected_num_parts
        for partition_id, load_func in loaded_partitions.items():
            df = load_func()
            assert_frame_equal(df, partitioned_data_pandas[partition_id + suffix])
            if suffix:
                assert not partition_id.endswith(suffix)
github quantumblacklabs / kedro / tests / io / test_partitioned_dataset.py View on Github external
def test_invalid_dataset(self, dataset, local_csvs):
        pds = PartitionedDataSet(str(local_csvs), dataset)
        loaded_partitions = pds.load()

        for partition, df_loader in loaded_partitions.items():
            pattern = r"Failed while loading data from data set ParquetLocalDataSet(.*)"
            with pytest.raises(DataSetError, match=pattern) as exc_info:
                df_loader()
            error_message = str(exc_info.value)
            assert "Invalid parquet file. Corrupt footer." in error_message
            assert str(partition) in error_message
github quantumblacklabs / kedro / tests / io / test_partitioned_dataset.py View on Github external
def test_exists(self, dataset, mocked_csvs_in_s3):
        assert PartitionedDataSet(mocked_csvs_in_s3, dataset).exists()

        empty_folder = "/".join([mocked_csvs_in_s3, "empty", "folder"])
        assert not PartitionedDataSet(empty_folder, dataset).exists()

        s3fs.S3FileSystem().mkdir(empty_folder)
        assert not PartitionedDataSet(empty_folder, dataset).exists()
github quantumblacklabs / kedro / tests / io / test_partitioned_dataset.py View on Github external
def test_load_args(self, mocker):
        fake_partition_name = "fake_partition"
        mocked_filesystem = mocker.patch("fsspec.filesystem")
        mocked_find = mocked_filesystem.return_value.find
        mocked_find.return_value = [fake_partition_name]

        path = str(Path.cwd())
        load_args = {"maxdepth": 42, "withdirs": True}
        pds = PartitionedDataSet(path, "CSVLocalDataSet", load_args=load_args)
        mocker.patch.object(pds, "_path_to_partition", return_value=fake_partition_name)

        assert pds.load().keys() == {fake_partition_name}
        mocked_find.assert_called_once_with(path, **load_args)
github quantumblacklabs / kedro / tests / io / test_partitioned_dataset.py View on Github external
def test_load(self, dataset, mocked_csvs_in_s3, partitioned_data_pandas):
        pds = PartitionedDataSet(mocked_csvs_in_s3, dataset)
        loaded_partitions = pds.load()

        assert loaded_partitions.keys() == partitioned_data_pandas.keys()
        for partition_id, load_func in loaded_partitions.items():
            df = load_func()
            assert_frame_equal(df, partitioned_data_pandas[partition_id])
github quantumblacklabs / kedro / tests / io / test_partitioned_dataset.py View on Github external
def test_credentials(
        self, mocker, credentials, expected_pds_creds, expected_dataset_creds
    ):
        mocked_filesystem = mocker.patch("fsspec.filesystem")
        path = str(Path.cwd())
        pds = PartitionedDataSet(path, "CSVLocalDataSet", credentials=credentials)

        assert mocked_filesystem.call_count == 2
        mocked_filesystem.assert_called_with("file", **expected_pds_creds)
        if expected_dataset_creds:
            assert pds._dataset_config["credentials"] == expected_dataset_creds

        str_repr = str(pds)

        def _assert_not_in_repr(value):
            if isinstance(value, dict):
                for k_, v_ in value.items():
                    _assert_not_in_repr(k_)
                    _assert_not_in_repr(v_)
            if value is not None:
                assert str(value) not in str_repr
github quantumblacklabs / kedro / tests / io / test_partitioned_dataset.py View on Github external
def test_no_partitions(self, tmpdir):
        pds = PartitionedDataSet(str(tmpdir), "CSVLocalDataSet")

        pattern = "No partitions found in `{}`".format(str(tmpdir))
        with pytest.raises(DataSetError, match=pattern):
            pds.load()
github quantumblacklabs / kedro / tests / io / test_partitioned_dataset.py View on Github external
def test_filepath_arg_warning(self, pds_config, filepath_arg):
        pattern = (
            "`{}` key must not be specified in the dataset definition as it "
            "will be overwritten by partition path".format(filepath_arg)
        )
        with pytest.warns(UserWarning, match=re.escape(pattern)):
            PartitionedDataSet(**pds_config)
github quantumblacklabs / kedro / tests / io / test_partitioned_dataset.py View on Github external
def test_release(self, dataset, mocked_csvs_in_s3):
        partition_to_remove = "p2.csv"
        pds = PartitionedDataSet(mocked_csvs_in_s3, dataset)
        initial_load = pds.load()
        assert partition_to_remove in initial_load

        s3 = s3fs.S3FileSystem()
        s3.rm("/".join([mocked_csvs_in_s3, partition_to_remove]))
        cached_load = pds.load()
        assert initial_load.keys() == cached_load.keys()

        pds.release()
        load_after_release = pds.load()
        assert initial_load.keys() ^ load_after_release.keys() == {partition_to_remove}