Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
fs.mkdirs(path, exist_ok=True)
object_encoding = kwargs.pop("object_encoding", "utf8")
index_cols = kwargs.pop("index_cols", [])
if object_encoding == "infer" or (
isinstance(object_encoding, dict) and "infer" in object_encoding.values()
):
raise ValueError(
'"infer" not allowed as object encoding, '
"because this required data in memory."
)
if append:
try:
# to append to a dataset without _metadata, need to load
# _common_metadata or any data file here
pf = fastparquet.api.ParquetFile(path, open_with=fs.open, sep=fs.sep)
except (IOError, ValueError):
# append for create
append = False
if append:
if pf.file_scheme not in ["hive", "empty", "flat"]:
raise ValueError(
"Requested file scheme is hive, but existing file scheme is not."
)
elif (set(pf.columns) != set(df.columns) - set(partition_on)) or (
set(partition_on) != set(pf.cats)
):
raise ValueError(
"Appended columns not the same.\n"
"Previous: {} | New: {}".format(pf.columns, list(df.columns))
)
elif (pd.Series(pf.dtypes).loc[pf.columns] != df[pf.columns].dtypes).any():
def _read_fastparquet(
fs,
fs_token,
paths,
columns=None,
filters=None,
categories=None,
index=None,
infer_divisions=None,
):
import fastparquet
if isinstance(paths, fastparquet.api.ParquetFile):
pf = paths
elif len(paths) > 1:
if infer_divisions is not False:
# this scans all the files, allowing index/divisions and filtering
pf = fastparquet.ParquetFile(paths, open_with=fs.open, sep=fs.sep)
else:
return _read_fp_multifile(
fs, fs_token, paths, columns=columns, categories=categories, index=index
)
else:
try:
pf = fastparquet.ParquetFile(
paths[0] + fs.sep + "_metadata", open_with=fs.open, sep=fs.sep
)
except Exception:
pf = fastparquet.ParquetFile(paths[0], open_with=fs.open, sep=fs.sep)
raise ValueError(
'"infer" not allowed as object encoding, '
"because this required data in memory."
)
divisions = df.divisions
if write_index is True or write_index is None and df.known_divisions:
df = df.reset_index()
index_cols = [df.columns[0]]
else:
ignore_divisions = True
index_cols = []
if append:
try:
pf = fastparquet.api.ParquetFile(path, open_with=fs.open, sep=sep)
except (IOError, ValueError):
# append for create
append = False
if append:
if pf.file_scheme not in ["hive", "empty", "flat"]:
raise ValueError(
"Requested file scheme is hive, " "but existing file scheme is not."
)
elif (set(pf.columns) != set(df.columns) - set(partition_on)) or (
set(partition_on) != set(pf.cats)
):
raise ValueError(
"Appended columns not the same.\n"
"Previous: {} | New: {}".format(pf.columns, list(df.columns))
)
elif (pd.Series(pf.dtypes).loc[pf.columns] != df[pf.columns].dtypes).any():