Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _read_fp_multifile(fs, fs_token, paths, columns=None, categories=None, index=None):
"""Read dataset with fastparquet by assuming metadata from first file"""
from fastparquet import ParquetFile
from fastparquet.util import analyse_paths, get_file_scheme, join_path
base, fns = analyse_paths(paths)
parsed_paths = [join_path(p) for p in paths]
scheme = get_file_scheme(fns)
pf = ParquetFile(paths[0], open_with=fs.open)
pf.file_scheme = scheme
pf.cats = _paths_to_cats(fns, scheme)
(
meta,
_,
index_name,
out_type,
all_columns,
index_names,
storage_name_mapping,
) = _pf_validation(pf, columns, index, categories, [])
name = "read-parquet-" + tokenize(fs_token, paths, all_columns, categories)
dsk = {
(name, i): (
_read_pf_simple,
)
if "_metadata" not in fns:
fast_metadata = False
else:
if "_metadata" in fns:
# We have a _metadata file, lets use it
pf = ParquetFile(
base + fs.sep + "_metadata",
open_with=fs.open,
sep=fs.sep,
**kwargs.get("file", {})
)
else:
# Rely on metadata for 0th file.
# Will need to pass a list of paths to read_partition
scheme = get_file_scheme(fns)
pf = ParquetFile(paths[0], open_with=fs.open, **kwargs.get("file", {}))
pf.file_scheme = scheme
pf.cats = _paths_to_cats(fns, scheme)
parts = paths.copy()
elif fs.isdir(paths[0]):
# This is a directory, check for _metadata, then _common_metadata
paths = fs.glob(paths[0] + fs.sep + "*")
base, fns = _analyze_paths(paths, fs)
if "_metadata" in fns:
# Using _metadata file (best-case scenario)
pf = ParquetFile(
base + fs.sep + "_metadata",
open_with=fs.open,
sep=fs.sep,
**kwargs.get("file", {})
)
elif gather_statistics is not False:
# Scan every file
pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
fast_metadata = False
else:
# Use _common_metadata file if it is available.
# Otherwise, just use 0th file
if "_common_metadata" in fns:
pf = ParquetFile(
base + fs.sep + "_common_metadata",
open_with=fs.open,
**kwargs.get("file", {})
)
else:
pf = ParquetFile(paths[0], open_with=fs.open, **kwargs.get("file", {}))
scheme = get_file_scheme(fns)
pf.file_scheme = scheme
pf.cats = _paths_to_cats(fns, scheme)
parts = paths.copy()
else:
# There is only one file to read
pf = ParquetFile(
paths[0], open_with=fs.open, sep=fs.sep, **kwargs.get("file", {})
)
return parts, pf, gather_statistics, fast_metadata