Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if compression is None:
compression_end = ""
elif compression == "snappy":
compression_end = ".snappy"
elif compression == "gzip":
compression_end = ".gz"
else:
raise InvalidCompression(compression)
guid = pa.compat.guid()
if file_format == "parquet":
outfile = f"{guid}.parquet{compression_end}"
elif file_format == "csv":
outfile = f"{guid}.csv{compression_end}"
else:
raise UnsupportedFileFormat(file_format)
object_path = "/".join([path, outfile])
if file_format == "parquet":
Pandas.write_parquet_dataframe(dataframe=dataframe,
path=object_path,
preserve_index=preserve_index,
compression=compression,
fs=fs,
cast_columns=cast_columns,
isolated_dataframe=isolated_dataframe)
elif file_format == "csv":
Pandas.write_csv_dataframe(dataframe=dataframe,
path=object_path,
preserve_index=preserve_index,
compression=compression,
fs=fs,
extra_args=extra_args)
partition_cols = [Athena.normalize_column_name(x) for x in partition_cols]
logger.debug(f"partition_cols: {partition_cols}")
dataframe = Pandas.drop_duplicated_columns(dataframe=dataframe, inplace=inplace)
if compression is not None:
compression = compression.lower()
file_format = file_format.lower()
if file_format == "csv":
if compression not in Pandas.VALID_CSV_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid CSV compression. Try: {Pandas.VALID_CSV_COMPRESSIONS}")
elif file_format == "parquet":
if compression not in Pandas.VALID_PARQUET_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid PARQUET compression. Try: {Pandas.VALID_PARQUET_COMPRESSIONS}")
else:
raise UnsupportedFileFormat(file_format)
if dataframe.empty:
raise EmptyDataframe()
if ((mode == "overwrite") or ((mode == "overwrite_partitions") and # noqa
(not partition_cols))):
self._session.s3.delete_objects(path=path)
elif mode not in ["overwrite_partitions", "append"]:
raise UnsupportedWriteMode(mode)
objects_paths = self.data_to_s3(dataframe=dataframe,
path=path,
partition_cols=partition_cols,
preserve_index=preserve_index,
file_format=file_format,
mode=mode,
compression=compression,
procs_cpu_bound=procs_cpu_bound,
procs_io_bound=procs_io_bound,
:param dataframe: PySpark Dataframe
:param file_format: File format (E.g. "parquet", "csv")
:param partition_by: Columns used for partitioning
:param path: AWS S3 path
:param compression: Compression (e.g. gzip, snappy, lzo, etc)
:param sep: Separator token for CSV formats (e.g. ",", ";", "|")
:param serde: Serializer/Deserializer (e.g. "OpenCSVSerDe", "LazySimpleSerDe")
:param database: Glue database name
:param table: Glue table name. If not passed, extracted from the path
:param load_partitions: Load partitions after the table creation
:param replace_if_exists: Drop table and recreates that if already exists
:return: None
"""
file_format = file_format.lower()
if file_format not in ["parquet", "csv"]:
raise UnsupportedFileFormat(file_format)
table = table if table else self._session.glue.parse_table_name(path)
table = table.lower().replace(".", "_")
logger.debug(f"table: {table}")
full_schema = dataframe.dtypes
if partition_by is None:
partition_by = []
schema = [x for x in full_schema if x[0] not in partition_by]
partitions_schema_tmp = {x[0]: x[1] for x in full_schema if x[0] in partition_by}
partitions_schema = [(x, partitions_schema_tmp[x]) for x in partition_by]
logger.debug(f"schema: {schema}")
logger.debug(f"partitions_schema: {partitions_schema}")
if replace_if_exists is not None:
self._session.glue.delete_table_if_exists(database=database, table=table)
extra_args = {}
if file_format == "csv":
extra_args["sep"] = sep
preserve_index=True,
file_format="parquet",
mode="append",
num_procs=None,
num_files=2,
):
"""
Write the parquet files to s3
"""
if not num_procs:
num_procs = mp.cpu_count()
if path[-1] == "/":
path = path[:-1]
file_format = file_format.lower()
if file_format not in ["parquet", "csv"]:
raise UnsupportedFileFormat(file_format)
partition_paths = None
if partition_cols is not None and len(partition_cols) > 0:
partition_paths = write_dataset_manager(
df=df,
path=path,
partition_cols=partition_cols,
session_primitives=session_primitives,
preserve_index=preserve_index,
file_format=file_format,
mode=mode,
num_procs=num_procs,
num_files=num_files,
)
else:
write_files_manager(
database, table, partition_paths, file_format, session_primitives=None
):
"""
Add a list of partitions in a Glue table
"""
client = get_session(session_primitives=session_primitives).client("glue")
if not partition_paths:
return None
partitions = list()
for partition in partition_paths:
if file_format == "parquet":
partition_def = p_get_partition_definition(partition)
elif file_format == "csv":
partition_def = c_get_partition_definition(partition)
else:
raise UnsupportedFileFormat(file_format)
partitions.append(partition_def)
pages_num = int(ceil(len(partitions) / 100.0))
for _ in range(pages_num):
page = partitions[:100]
del partitions[:100]
client.batch_create_partition(
DatabaseName=database, TableName=table, PartitionInputList=page
)