Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_prevent_override(self, versioned_pickle_data_set, dummy_dataframe):
"""Check the error when attempting to override the data set if the
corresponding pickle file for a given save version already exists."""
versioned_pickle_data_set.save(dummy_dataframe)
pattern = (
r"Save path \`.+\` for PickleLocalDataSet\(.+\) must "
r"not exist if versioning is enabled\."
)
with pytest.raises(DataSetError, match=pattern):
versioned_pickle_data_set.save(dummy_dataframe)
if versioning was not enabled.
save_version: Version string to be used for ``save`` operation if
the data set is versioned. Has no effect on the data set
if versioning was not enabled.
Raises:
DataSetError: If the function fails to parse the configuration provided.
Returns:
2-tuple: (Dataset class object, configuration dictionary)
"""
save_version = save_version or generate_timestamp()
config = copy.deepcopy(config)
if "type" not in config:
raise DataSetError("`type` is missing from DataSet catalog configuration")
class_obj = config.pop("type")
if isinstance(class_obj, str):
try:
class_obj = load_obj(class_obj, "kedro.io")
except ImportError:
raise DataSetError(
"Cannot import module when trying to load type `{}`.".format(class_obj)
)
except AttributeError:
raise DataSetError("Class `{}` not found.".format(class_obj))
if not issubclass(class_obj, AbstractDataSet):
raise DataSetError(
"DataSet type `{}.{}` is invalid: all data set types must extend "
"`AbstractDataSet`.".format(class_obj.__module__, class_obj.__qualname__)
def _exists(self) -> bool:
try:
load_path = self._get_load_path()
except DataSetError:
return False
return self._gcs.exists(load_path)
def _save(self, data: Any) -> None:
save_path = Path(self._get_save_path())
save_path.parent.mkdir(parents=True, exist_ok=True)
with save_path.open("wb") as local_file:
try:
self.BACKENDS[self._backend].dump(data, local_file, **self._save_args)
except Exception: # pylint: disable=broad-except
# Checks if the error is due to serialisation or not
try:
self.BACKENDS[self._backend].dumps(data)
except Exception:
raise DataSetError(
"{} cannot be serialized. {} can only be used with "
"serializable data".format(
str(data.__class__), str(self.__class__.__name__)
)
)
else:
raise # pragma: no cover
"""Creates a new instance of ``LambdaDataSet`` with references to the
required input/output data set methods.
Args:
load: Method to load data from a data set.
save: Method to save data to a data set.
exists: Method to check whether output data already exists.
If None, no exists method is added.
Raises:
DataSetError: If load and/or save is specified, but is not a Callable.
"""
if load is not None and not callable(load):
raise DataSetError(
"`load` function for LambdaDataSet must be a Callable. "
"Object of type `{}` provided instead.".format(load.__class__.__name__)
)
if save is not None and not callable(save):
raise DataSetError(
"`save` function for LambdaDataSet must be a Callable. "
"Object of type `{}` provided instead.".format(save.__class__.__name__)
)
if exists is not None and not callable(exists):
raise DataSetError(
"`exists` function for LambdaDataSet must be a Callable. "
"Object of type `{}` provided instead.".format(
exists.__class__.__name__
)
)
``AbstractDataSet`` implementations should provide instructive
information in case of failure.
"""
pass
class DataSetNotFoundError(DataSetError):
"""``DataSetNotFoundError`` raised by ``DataCatalog`` class in case of
trying to use a non-existing data set.
"""
pass
class DataSetAlreadyExistsError(DataSetError):
"""``DataSetAlreadyExistsError`` raised by ``DataCatalog`` class in case
of trying to add a data set which already exists in the ``DataCatalog``.
"""
pass
class VersionNotFoundError(DataSetError):
"""``VersionNotFoundError`` raised by ``AbstractVersionedDataSet`` implementations
in case of no load versions available for the data set.
"""
pass
class AbstractDataSet(abc.ABC):
def _get_sql_alchemy_missing_error() -> DataSetError:
return DataSetError(
"The SQL dialect in your connection is not supported by "
"SQLAlchemy. Please refer to "
HTTP_PROTOCOLS = ("http", "https")
PROTOCOL_DELIMITER = "://"
class DataSetError(Exception):
"""``DataSetError`` raised by ``AbstractDataSet`` implementations
in case of failure of input/output methods.
``AbstractDataSet`` implementations should provide instructive
information in case of failure.
"""
pass
class DataSetNotFoundError(DataSetError):
"""``DataSetNotFoundError`` raised by ``DataCatalog`` class in case of
trying to use a non-existing data set.
"""
pass
class DataSetAlreadyExistsError(DataSetError):
"""``DataSetAlreadyExistsError`` raised by ``DataCatalog`` class in case
of trying to add a data set which already exists in the ``DataCatalog``.
"""
pass
class VersionNotFoundError(DataSetError):
"""
self._logger.debug("Loading %s", str(self))
try:
return self._load()
except DataSetError:
raise
except Exception as exc:
# This exception handling is by design as the composed data sets
# can throw any type of exception.
message = "Failed while loading data from data set {}.\n{}".format(
str(self), str(exc)
)
raise DataSetError(message) from exc
all supported connection string formats, see here:
https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls
load_args: Provided to underlying pandas ``read_sql_query``
function along with the connection string.
To find all supported arguments, see here:
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql_query.html
To find all supported connection string formats, see here:
https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls
Raises:
DataSetError: When either ``sql`` or ``con`` parameters is emtpy.
"""
if not sql:
raise DataSetError(
"`sql` argument cannot be empty. Please provide a sql query"
)
if not (credentials and "con" in credentials and credentials["con"]):
raise DataSetError(
"`con` argument cannot be empty. Please "
"provide a SQLAlchemy connection string."
)
default_load_args = {} # type: Dict[str, Any]
self._load_args = (
{**default_load_args, **load_args}
if load_args is not None
else default_load_args
)