Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if partition_cols is None:
partition_cols = []
if cast_columns is None:
cast_columns = {}
dataframe = Pandas.normalize_columns_names_athena(dataframe, inplace=inplace)
cast_columns = {Athena.normalize_column_name(k): v for k, v in cast_columns.items()}
logger.debug(f"cast_columns: {cast_columns}")
partition_cols = [Athena.normalize_column_name(x) for x in partition_cols]
logger.debug(f"partition_cols: {partition_cols}")
dataframe = Pandas.drop_duplicated_columns(dataframe=dataframe, inplace=inplace)
if compression is not None:
compression = compression.lower()
file_format = file_format.lower()
if file_format == "csv":
if compression not in Pandas.VALID_CSV_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid CSV compression. Try: {Pandas.VALID_CSV_COMPRESSIONS}")
elif file_format == "parquet":
if compression not in Pandas.VALID_PARQUET_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid PARQUET compression. Try: {Pandas.VALID_PARQUET_COMPRESSIONS}")
else:
raise UnsupportedFileFormat(file_format)
if dataframe.empty:
raise EmptyDataframe()
if ((mode == "overwrite") or ((mode == "overwrite_partitions") and # noqa
(not partition_cols))):
self._session.s3.delete_objects(path=path)
elif mode not in ["overwrite_partitions", "append"]:
raise UnsupportedWriteMode(mode)
objects_paths = self.data_to_s3(dataframe=dataframe,
path=path,
file_format,
cast_columns=None,
extra_args=None,
isolated_dataframe=False):
fs = s3.get_fs(session_primitives=session_primitives)
fs = pa.filesystem._ensure_filesystem(fs)
s3.mkdir_if_not_exists(fs, path)
if compression is None:
compression_end = ""
elif compression == "snappy":
compression_end = ".snappy"
elif compression == "gzip":
compression_end = ".gz"
else:
raise InvalidCompression(compression)
guid = pa.compat.guid()
if file_format == "parquet":
outfile = f"{guid}.parquet{compression_end}"
elif file_format == "csv":
outfile = f"{guid}.csv{compression_end}"
else:
raise UnsupportedFileFormat(file_format)
object_path = "/".join([path, outfile])
if file_format == "parquet":
Pandas.write_parquet_dataframe(dataframe=dataframe,
path=object_path,
preserve_index=preserve_index,
compression=compression,
fs=fs,
cast_columns=cast_columns,
dataframe = Pandas.normalize_columns_names_athena(dataframe, inplace=inplace)
cast_columns = {Athena.normalize_column_name(k): v for k, v in cast_columns.items()}
logger.debug(f"cast_columns: {cast_columns}")
partition_cols = [Athena.normalize_column_name(x) for x in partition_cols]
logger.debug(f"partition_cols: {partition_cols}")
dataframe = Pandas.drop_duplicated_columns(dataframe=dataframe, inplace=inplace)
if compression is not None:
compression = compression.lower()
file_format = file_format.lower()
if file_format == "csv":
if compression not in Pandas.VALID_CSV_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid CSV compression. Try: {Pandas.VALID_CSV_COMPRESSIONS}")
elif file_format == "parquet":
if compression not in Pandas.VALID_PARQUET_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid PARQUET compression. Try: {Pandas.VALID_PARQUET_COMPRESSIONS}")
else:
raise UnsupportedFileFormat(file_format)
if dataframe.empty:
raise EmptyDataframe()
if ((mode == "overwrite") or ((mode == "overwrite_partitions") and # noqa
(not partition_cols))):
self._session.s3.delete_objects(path=path)
elif mode not in ["overwrite_partitions", "append"]:
raise UnsupportedWriteMode(mode)
objects_paths = self.data_to_s3(dataframe=dataframe,
path=path,
partition_cols=partition_cols,
preserve_index=preserve_index,
file_format=file_format,
mode=mode,