Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def default_csv(name):
return CSVLocalDataSet(name)
def test_fails_with_remote_path(self, path):
pattern = "seems to be a remote file"
with pytest.raises(ValueError, match=pattern):
CSVLocalDataSet(filepath=path, save_args={"sep": ","})
# a single spark partition with csv format and retrieve it with Kedro
# CSVLocalDataSet
temp_dir = Path(str(tmp_path / "test_data"))
spark_data_set = SparkDataSet(
filepath=str(temp_dir),
file_format="csv",
save_args={"sep": "|", "header": True},
)
spark_df = sample_spark_df.coalesce(1)
spark_data_set.save(spark_df)
single_csv_file = [
f for f in temp_dir.iterdir() if f.is_file() and f.suffix == ".csv"
][0]
csv_local_data_set = CSVLocalDataSet(
filepath=str(single_csv_file), load_args={"sep": "|"}
)
pandas_df = csv_local_data_set.load()
assert pandas_df[pandas_df["name"] == "Alex"]["age"][0] == 31
def test_load_options_csv(self, tmp_path, sample_pandas_df):
filepath = str(tmp_path / "data")
local_csv_data_set = CSVLocalDataSet(filepath=filepath)
local_csv_data_set.save(sample_pandas_df)
spark_data_set = SparkDataSet(
filepath=filepath, file_format="csv", load_args={"header": True}
)
spark_df = spark_data_set.load()
assert spark_df.filter(col("Name") == "Alex").count() == 1
def test_sequential_save_and_load(self, dummy_dataframe, filepath):
"""Tests if the correct load version is logged when two datasets are saved
sequentially."""
dataset1 = CSVLocalDataSet(
filepath=filepath,
save_args={"sep": ","},
version=Version(None, "2000-01-01"),
)
dataset2 = CSVLocalDataSet(
filepath=filepath,
save_args={"sep": ","},
version=Version(None, "2001-01-01"),
)
dataset1.save(dummy_dataframe)
last_save_version1 = dataset1.get_last_save_version()
dataset2.save(dummy_dataframe)
last_save_version2 = dataset2.get_last_save_version()
dataset2.load()
last_load_version = dataset2.get_last_load_version()
assert last_save_version2 == last_load_version
assert last_save_version1 != last_save_version2
def csv_data_set(filepath, request):
return CSVLocalDataSet(filepath=filepath, save_args=request.param)
def test_sequential_save_and_load(self, dummy_dataframe, filepath):
"""Tests if the correct load version is logged when two datasets are saved
sequentially."""
dataset1 = CSVLocalDataSet(
filepath=filepath,
save_args={"sep": ","},
version=Version(None, "2000-01-01"),
)
dataset2 = CSVLocalDataSet(
filepath=filepath,
save_args={"sep": ","},
version=Version(None, "2001-01-01"),
)
dataset1.save(dummy_dataframe)
last_save_version1 = dataset1.get_last_save_version()
dataset2.save(dummy_dataframe)
last_save_version2 = dataset2.get_last_save_version()
def versioned_csv_data_set(filepath, load_version, save_version):
return CSVLocalDataSet(
filepath=filepath,
save_args={"sep": ","},
version=Version(load_version, save_version),
)
def test_datasets_on_add(self, data_catalog_from_config):
"""Check datasets are updated correctly after adding"""
data_catalog_from_config.add("new_dataset", CSVLocalDataSet("some_path"))
assert isinstance(
data_catalog_from_config.datasets.new_dataset, CSVLocalDataSet
)
assert isinstance(data_catalog_from_config.datasets.boats, CSVLocalDataSet)
def prepare_csv_data_with_tabs(context):
context.read_csv_path = create_sample_csv()
context.write_csv_path = create_temp_csv()
context.csv_data_set = CSVLocalDataSet(
filepath=context.read_csv_path,
load_args={"sep": "\t"},
save_args={"index": False, "sep": "\t"},
)