Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param preserve_index: Should preserve index on S3?
:param mode: "append", "overwrite", "overwrite_partitions"
:param compression: None, gzip, snappy, etc
:param procs_cpu_bound: Number of cores used for CPU bound tasks
:param procs_io_bound: Number of cores used for I/O bound tasks
:param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format)
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
:param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact
:return: List of objects written on S3
"""
if partition_cols is None:
partition_cols = []
if cast_columns is None:
cast_columns = {}
dataframe = Pandas.normalize_columns_names_athena(dataframe, inplace=inplace)
cast_columns = {Athena.normalize_column_name(k): v for k, v in cast_columns.items()}
logger.debug(f"cast_columns: {cast_columns}")
partition_cols = [Athena.normalize_column_name(x) for x in partition_cols]
logger.debug(f"partition_cols: {partition_cols}")
dataframe = Pandas.drop_duplicated_columns(dataframe=dataframe, inplace=inplace)
if compression is not None:
compression = compression.lower()
file_format = file_format.lower()
if file_format == "csv":
if compression not in Pandas.VALID_CSV_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid CSV compression. Try: {Pandas.VALID_CSV_COMPRESSIONS}")
elif file_format == "parquet":
if compression not in Pandas.VALID_PARQUET_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid PARQUET compression. Try: {Pandas.VALID_PARQUET_COMPRESSIONS}")
else:
:param compression: None, gzip, snappy, etc
:param procs_cpu_bound: Number of cores used for CPU bound tasks
:param procs_io_bound: Number of cores used for I/O bound tasks
:param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format)
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
:param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact
:return: List of objects written on S3
"""
if partition_cols is None:
partition_cols = []
if cast_columns is None:
cast_columns = {}
dataframe = Pandas.normalize_columns_names_athena(dataframe, inplace=inplace)
cast_columns = {Athena.normalize_column_name(k): v for k, v in cast_columns.items()}
logger.debug(f"cast_columns: {cast_columns}")
partition_cols = [Athena.normalize_column_name(x) for x in partition_cols]
logger.debug(f"partition_cols: {partition_cols}")
dataframe = Pandas.drop_duplicated_columns(dataframe=dataframe, inplace=inplace)
if compression is not None:
compression = compression.lower()
file_format = file_format.lower()
if file_format == "csv":
if compression not in Pandas.VALID_CSV_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid CSV compression. Try: {Pandas.VALID_CSV_COMPRESSIONS}")
elif file_format == "parquet":
if compression not in Pandas.VALID_PARQUET_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid PARQUET compression. Try: {Pandas.VALID_PARQUET_COMPRESSIONS}")
else:
raise UnsupportedFileFormat(file_format)
if dataframe.empty:
def normalize_columns_names_athena(dataframe, inplace=True):
if inplace is False:
dataframe = dataframe.copy(deep=True)
dataframe.columns = [Athena.normalize_column_name(x) for x in dataframe.columns]
return dataframe