Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_describe(self, dataset):
path = "s3://{}/foo/bar".format(BUCKET_NAME)
pds = PartitionedDataSet(path, dataset)
assert "path={}".format(path) in str(pds)
assert "dataset_type=CSVS3DataSet" in str(pds)
if isinstance(dataset, dict) and dataset.keys() - {"type"}:
assert "dataset_config" in str(pds)
else:
assert "dataset_config" not in str(pds)
def test_load(
self, dataset, local_csvs, partitioned_data_pandas, suffix, expected_num_parts
):
pds = PartitionedDataSet(str(local_csvs), dataset, filename_suffix=suffix)
loaded_partitions = pds.load()
assert len(loaded_partitions.keys()) == expected_num_parts
for partition_id, load_func in loaded_partitions.items():
df = load_func()
assert_frame_equal(df, partitioned_data_pandas[partition_id + suffix])
if suffix:
assert not partition_id.endswith(suffix)
def test_invalid_dataset(self, dataset, local_csvs):
pds = PartitionedDataSet(str(local_csvs), dataset)
loaded_partitions = pds.load()
for partition, df_loader in loaded_partitions.items():
pattern = r"Failed while loading data from data set ParquetLocalDataSet(.*)"
with pytest.raises(DataSetError, match=pattern) as exc_info:
df_loader()
error_message = str(exc_info.value)
assert "Invalid parquet file. Corrupt footer." in error_message
assert str(partition) in error_message
def test_exists(self, dataset, mocked_csvs_in_s3):
assert PartitionedDataSet(mocked_csvs_in_s3, dataset).exists()
empty_folder = "/".join([mocked_csvs_in_s3, "empty", "folder"])
assert not PartitionedDataSet(empty_folder, dataset).exists()
s3fs.S3FileSystem().mkdir(empty_folder)
assert not PartitionedDataSet(empty_folder, dataset).exists()
def test_load_args(self, mocker):
fake_partition_name = "fake_partition"
mocked_filesystem = mocker.patch("fsspec.filesystem")
mocked_find = mocked_filesystem.return_value.find
mocked_find.return_value = [fake_partition_name]
path = str(Path.cwd())
load_args = {"maxdepth": 42, "withdirs": True}
pds = PartitionedDataSet(path, "CSVLocalDataSet", load_args=load_args)
mocker.patch.object(pds, "_path_to_partition", return_value=fake_partition_name)
assert pds.load().keys() == {fake_partition_name}
mocked_find.assert_called_once_with(path, **load_args)
def test_load(self, dataset, mocked_csvs_in_s3, partitioned_data_pandas):
pds = PartitionedDataSet(mocked_csvs_in_s3, dataset)
loaded_partitions = pds.load()
assert loaded_partitions.keys() == partitioned_data_pandas.keys()
for partition_id, load_func in loaded_partitions.items():
df = load_func()
assert_frame_equal(df, partitioned_data_pandas[partition_id])
def test_credentials(
self, mocker, credentials, expected_pds_creds, expected_dataset_creds
):
mocked_filesystem = mocker.patch("fsspec.filesystem")
path = str(Path.cwd())
pds = PartitionedDataSet(path, "CSVLocalDataSet", credentials=credentials)
assert mocked_filesystem.call_count == 2
mocked_filesystem.assert_called_with("file", **expected_pds_creds)
if expected_dataset_creds:
assert pds._dataset_config["credentials"] == expected_dataset_creds
str_repr = str(pds)
def _assert_not_in_repr(value):
if isinstance(value, dict):
for k_, v_ in value.items():
_assert_not_in_repr(k_)
_assert_not_in_repr(v_)
if value is not None:
assert str(value) not in str_repr
def test_no_partitions(self, tmpdir):
pds = PartitionedDataSet(str(tmpdir), "CSVLocalDataSet")
pattern = "No partitions found in `{}`".format(str(tmpdir))
with pytest.raises(DataSetError, match=pattern):
pds.load()
def test_filepath_arg_warning(self, pds_config, filepath_arg):
pattern = (
"`{}` key must not be specified in the dataset definition as it "
"will be overwritten by partition path".format(filepath_arg)
)
with pytest.warns(UserWarning, match=re.escape(pattern)):
PartitionedDataSet(**pds_config)
def test_release(self, dataset, mocked_csvs_in_s3):
partition_to_remove = "p2.csv"
pds = PartitionedDataSet(mocked_csvs_in_s3, dataset)
initial_load = pds.load()
assert partition_to_remove in initial_load
s3 = s3fs.S3FileSystem()
s3.rm("/".join([mocked_csvs_in_s3, partition_to_remove]))
cached_load = pds.load()
assert initial_load.keys() == cached_load.keys()
pds.release()
load_after_release = pds.load()
assert initial_load.keys() ^ load_after_release.keys() == {partition_to_remove}