Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ret = self.read_csv(path=path,
dtype=dtype,
parse_dates=parse_timestamps,
converters=converters,
quoting=csv.QUOTE_ALL,
max_result_size=max_result_size)
if max_result_size is None:
if len(ret.index) > 0:
for col in parse_dates:
if str(ret[col].dtype) == "object":
ret[col] = ret[col].apply(lambda x: date(*[int(y) for y in x.split("-")]))
else:
ret[col] = ret[col].dt.date.replace(to_replace={pd.NaT: None})
return ret
else:
return Pandas._apply_dates_to_generator(generator=ret, parse_dates=parse_dates)
partition_cols = []
if cast_columns is None:
cast_columns = {}
dataframe = Pandas.normalize_columns_names_athena(dataframe, inplace=inplace)
cast_columns = {Athena.normalize_column_name(k): v for k, v in cast_columns.items()}
logger.debug(f"cast_columns: {cast_columns}")
partition_cols = [Athena.normalize_column_name(x) for x in partition_cols]
logger.debug(f"partition_cols: {partition_cols}")
dataframe = Pandas.drop_duplicated_columns(dataframe=dataframe, inplace=inplace)
if compression is not None:
compression = compression.lower()
file_format = file_format.lower()
if file_format == "csv":
if compression not in Pandas.VALID_CSV_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid CSV compression. Try: {Pandas.VALID_CSV_COMPRESSIONS}")
elif file_format == "parquet":
if compression not in Pandas.VALID_PARQUET_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid PARQUET compression. Try: {Pandas.VALID_PARQUET_COMPRESSIONS}")
else:
raise UnsupportedFileFormat(file_format)
if dataframe.empty:
raise EmptyDataframe()
if ((mode == "overwrite") or ((mode == "overwrite_partitions") and # noqa
(not partition_cols))):
self._session.s3.delete_objects(path=path)
elif mode not in ["overwrite_partitions", "append"]:
raise UnsupportedWriteMode(mode)
objects_paths = self.data_to_s3(dataframe=dataframe,
path=path,
partition_cols=partition_cols,
def _find_terminator(body, sep, quoting, quotechar, lineterminator):
"""
Find for any suspicious of line terminator (From end to start)
:param body: String
:param sep: Same as pandas.read_csv()
:param quoting: Same as pandas.read_csv()
:param quotechar: Same as pandas.read_csv()
:param lineterminator: Same as pandas.read_csv()
:return: The index of the suspect line terminator
"""
try:
last_index = None
if quoting == csv.QUOTE_ALL:
while True:
profile = Pandas._extract_terminator_profile(body=body,
sep=sep,
quotechar=quotechar,
lineterminator=lineterminator,
last_index=last_index)
if profile["last_terminator_suspect_index"] and profile["first_non_special_byte_index"]:
if profile["quote_counter"] % 2 == 0 or profile["quote_counter"] == 0:
last_index = profile["last_terminator_suspect_index"]
else:
index = profile["last_terminator_suspect_index"]
break
else:
raise LineTerminatorNotFound()
else:
index = body.rindex(lineterminator.encode(encoding="utf-8"))
except ValueError:
raise LineTerminatorNotFound()
def _data_to_s3_dataset_writer(dataframe,
path,
partition_cols,
preserve_index,
compression,
session_primitives,
file_format,
cast_columns=None,
extra_args=None,
isolated_dataframe=False):
objects_paths = []
dataframe = Pandas._cast_pandas(dataframe=dataframe, cast_columns=cast_columns)
cast_columns_materialized = {c: t for c, t in cast_columns.items() if c not in partition_cols}
if not partition_cols:
object_path = Pandas._data_to_s3_object_writer(dataframe=dataframe,
path=path,
preserve_index=preserve_index,
compression=compression,
session_primitives=session_primitives,
file_format=file_format,
cast_columns=cast_columns_materialized,
extra_args=extra_args,
isolated_dataframe=isolated_dataframe)
objects_paths.append(object_path)
else:
dataframe = Pandas._cast_pandas(dataframe=dataframe, cast_columns=cast_columns)
for keys, subgroup in dataframe.groupby(partition_cols):
subgroup = subgroup.drop(partition_cols, axis="columns")
if not isinstance(keys, tuple):
keys = (keys, )
subdir = "/".join([f"{name}={val}" for name, val in zip(partition_cols, keys)])
csv_extra_args = {}
sep = extra_args.get("sep")
if sep is not None:
csv_extra_args["sep"] = sep
serde = extra_args.get("serde")
if serde is not None:
if serde == "OpenCSVSerDe":
csv_extra_args["quoting"] = csv.QUOTE_ALL
csv_extra_args["escapechar"] = "\\"
elif serde == "LazySimpleSerDe":
csv_extra_args["quoting"] = csv.QUOTE_NONE
csv_extra_args["escapechar"] = "\\"
csv_buffer = bytes(
dataframe.to_csv(None, header=False, index=preserve_index, compression=compression, **csv_extra_args),
"utf-8")
Pandas._write_csv_to_s3_retrying(fs=fs, path=path, buffer=csv_buffer)
:param procs_io_bound: Number of cores used for I/O bound tasks
:param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format)
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
:param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact
:return: List of objects written on S3
"""
if partition_cols is None:
partition_cols = []
if cast_columns is None:
cast_columns = {}
dataframe = Pandas.normalize_columns_names_athena(dataframe, inplace=inplace)
cast_columns = {Athena.normalize_column_name(k): v for k, v in cast_columns.items()}
logger.debug(f"cast_columns: {cast_columns}")
partition_cols = [Athena.normalize_column_name(x) for x in partition_cols]
logger.debug(f"partition_cols: {partition_cols}")
dataframe = Pandas.drop_duplicated_columns(dataframe=dataframe, inplace=inplace)
if compression is not None:
compression = compression.lower()
file_format = file_format.lower()
if file_format == "csv":
if compression not in Pandas.VALID_CSV_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid CSV compression. Try: {Pandas.VALID_CSV_COMPRESSIONS}")
elif file_format == "parquet":
if compression not in Pandas.VALID_PARQUET_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid PARQUET compression. Try: {Pandas.VALID_PARQUET_COMPRESSIONS}")
else:
raise UnsupportedFileFormat(file_format)
if dataframe.empty:
raise EmptyDataframe()
if ((mode == "overwrite") or ((mode == "overwrite_partitions") and # noqa
:param lineterminator: Same as pandas.read_csv()
:param quotechar: Same as pandas.read_csv()
:param quoting: Same as pandas.read_csv()
:param escapechar: Same as pandas.read_csv()
:param parse_dates: Same as pandas.read_csv()
:param infer_datetime_format: Same as pandas.read_csv()
:param encoding: Same as pandas.read_csv()
:param converters: Same as pandas.read_csv()
:return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None
"""
bucket_name, key_path = self._parse_path(path)
client_s3 = self._session.boto3_session.client(service_name="s3",
use_ssl=True,
config=self._session.botocore_config)
if max_result_size:
ret = Pandas._read_csv_iterator(client_s3=client_s3,
bucket_name=bucket_name,
key_path=key_path,
max_result_size=max_result_size,
header=header,
names=names,
usecols=usecols,
dtype=dtype,
sep=sep,
thousands=thousands,
decimal=decimal,
lineterminator=lineterminator,
quotechar=quotechar,
quoting=quoting,
escapechar=escapechar,
parse_dates=parse_dates,
infer_datetime_format=infer_datetime_format,
:param quoting: Same as pandas.read_csv()
:param escapechar: Same as pandas.read_csv()
:param parse_dates: Same as pandas.read_csv()
:param infer_datetime_format: Same as pandas.read_csv()
:param encoding: Same as pandas.read_csv()
:param converters: Same as pandas.read_csv()
:return: Pandas Dataframe
"""
metadata = s3.S3.head_object_with_retry(client=client_s3, bucket=bucket_name, key=key_path)
logger.debug(f"metadata: {metadata}")
total_size = metadata["ContentLength"]
logger.debug(f"total_size: {total_size}")
if total_size <= 0:
raise EmptyS3Object(metadata)
elif total_size <= max_result_size:
yield Pandas._read_csv_once(client_s3=client_s3,
bucket_name=bucket_name,
key_path=key_path,
header=header,
names=names,
usecols=usecols,
dtype=dtype,
sep=sep,
thousands=thousands,
decimal=decimal,
lineterminator=lineterminator,
quotechar=quotechar,
quoting=quoting,
escapechar=escapechar,
parse_dates=parse_dates,
infer_datetime_format=infer_datetime_format,
encoding=encoding,
if (type(path) == str) or (len(path) == 1):
path = path[0] if type(path) == list else path # type: ignore
df = Pandas._read_parquet_path(
session_primitives=session_primitives,
path=path, # type: ignore
columns=columns,
filters=filters,
procs_cpu_bound=procs_cpu_bound)
else:
df = Pandas._read_parquet_path(session_primitives=session_primitives,
path=path[0],
columns=columns,
filters=filters,
procs_cpu_bound=procs_cpu_bound)
for p in path[1:]:
df_aux = Pandas._read_parquet_path(session_primitives=session_primitives,
path=p,
columns=columns,
filters=filters,
procs_cpu_bound=procs_cpu_bound)
df = pd.concat(objs=[df, df_aux], ignore_index=True)
return df