Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_to_csv_serde_exception(
session,
bucket,
database,
):
dataframe = pd.read_csv("data_samples/nano.csv")
with pytest.raises(InvalidSerDe):
assert session.pandas.to_csv(dataframe=dataframe,
database=database,
path=f"s3://{bucket}/test/",
preserve_index=False,
mode="overwrite",
serde="foo")
:param dataframe: Pandas Dataframe
:param path: AWS S3 path (E.g. s3://bucket-name/folder_name/
:param sep: Same as pandas.to_csv()
:param serde: SerDe library name (e.g. OpenCSVSerDe, LazySimpleSerDe)
:param database: AWS Glue Database name
:param table: AWS Glue table name
:param partition_cols: List of columns names that will be partitions on S3
:param preserve_index: Should preserve index on S3?
:param mode: "append", "overwrite", "overwrite_partitions"
:param procs_cpu_bound: Number of cores used for CPU bound tasks
:param procs_io_bound: Number of cores used for I/O bound tasks
:param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact
:return: List of objects written on S3
"""
if serde not in Pandas.VALID_CSV_SERDES:
raise InvalidSerDe(f"{serde} in not in the valid SerDe list ({Pandas.VALID_CSV_SERDES})")
extra_args = {"sep": sep, "serde": serde}
return self.to_s3(dataframe=dataframe,
path=path,
file_format="csv",
database=database,
table=table,
partition_cols=partition_cols,
preserve_index=preserve_index,
mode=mode,
compression=None,
procs_cpu_bound=procs_cpu_bound,
procs_io_bound=procs_io_bound,
extra_args=extra_args,
inplace=inplace)
extra_args = {}
compressed = False if compression is None else True
sep = extra_args["sep"] if "sep" in extra_args else ","
serde = extra_args.get("serde")
if serde == "OpenCSVSerDe":
serde_fullname = "org.apache.hadoop.hive.serde2.OpenCSVSerde"
param = {
"separatorChar": sep,
"quoteChar": "\"",
"escapeChar": "\\",
}
elif serde == "LazySimpleSerDe":
serde_fullname = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
param = {"field.delim": sep, "escape.delim": "\\"}
else:
raise InvalidSerDe(f"{serde} in not in the valid SerDe list.")
return {
"StorageDescriptor": {
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"Location": partition[0],
"Compressed": compressed,
"SerdeInfo": {
"Parameters": param,
"SerializationLibrary": serde_fullname,
},
"StoredAsSubDirectories": False,
},
"Values": partition[1],
}
param = {
"separatorChar": sep,
"quoteChar": "\"",
"escapeChar": "\\",
}
refined_par_schema = [(name, "string") for name, dtype in partition_cols_schema]
refined_schema = [(name, "string") for name, dtype in schema]
elif serde == "LazySimpleSerDe":
serde_fullname = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
param = {"field.delim": sep, "escape.delim": "\\"}
dtypes_allowed = ["int", "bigint", "float", "double"]
refined_par_schema = [(name, dtype) if dtype in dtypes_allowed else (name, "string")
for name, dtype in partition_cols_schema]
refined_schema = [(name, dtype) if dtype in dtypes_allowed else (name, "string") for name, dtype in schema]
else:
raise InvalidSerDe(f"{serde} in not in the valid SerDe list.")
return {
"Name": table,
"PartitionKeys": [{
"Name": x[0],
"Type": x[1]
} for x in refined_par_schema],
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"classification": "csv",
"compressionType": str(compression).lower(),
"typeOfData": "file",
"delimiter": sep,
"columnsOrdered": "true",
"areColumnsQuoted": "false",
},
"StorageDescriptor": {