Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Function build indexes for dataset piece described in piece_info
:param piece_info: description of dataset piece
:param dataset_url: dataset location
:param partitions: dataset partitions
:param indexers: list of indexer objects
:param schema: dataset schema
:param hdfs_driver: A string denoting the hdfs driver to use (if using a dataset on hdfs). Current choices are
libhdfs (java through JNI) or libhdfs3 (C++)
:return: list of indexers containing index data
"""
# Resolver in executor context will get hadoop config from environment
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
fs = resolver.filesystem()
# Create pyarrow piece
piece = compat_make_parquet_piece(piece_info.path, fs.open, row_group=piece_info.row_group,
partition_keys=piece_info.partition_keys)
# Collect column names needed for indexing
column_names = set()
for indexer in indexers:
column_names.update(indexer.column_names)
# Read columns needed for indexing
column_rows = compat_piece_read(piece, fs.open, columns=list(column_names),
partitions=partitions).to_pandas().to_dict('records')
# Decode columns values
decoded_rows = [utils.decode_row(row, schema) for row in column_rows]
if not decoded_rows:
raise ValueError('Cannot build index with empty decoded_rows, columns: {}, partitions: {}'
.format(column_names, partitions))
' this file in your ETL code.'
' You can generate it on an existing dataset using petastorm-generate-metadata.py')
metadata_dict_key = ROW_GROUPS_PER_FILE_KEY
row_groups_per_file = json.loads(dataset_metadata_dict[metadata_dict_key].decode())
rowgroups = []
# Force order of pieces. The order is not deterministic since it depends on multithreaded directory
# listing implementation inside pyarrow. We stabilize order here, this way we get reproducable order
# when pieces shuffling is off. This also enables implementing piece shuffling given a seed
sorted_pieces = sorted(dataset.pieces, key=attrgetter('path'))
for piece in sorted_pieces:
# If we are not using absolute paths, we need to convert the path to a relative path for
# looking up the number of row groups.
row_groups_key = os.path.relpath(piece.path, dataset.paths)
for row_group in range(row_groups_per_file[row_groups_key]):
rowgroups.append(compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys))
return rowgroups
base_path = os.path.normpath(os.path.dirname(dataset.metadata_path))
split_pieces = []
for piece in dataset.pieces:
# Since the pieces are absolute path, we get the
# relative path to the dataset base dir to fetch the
# number of row groups in the file
relative_path = os.path.relpath(piece.path, base_path)
# If the path is not in the metadata file, that means there are
# no row groups in that file and that file should be skipped
if relative_path not in row_groups_per_file:
continue
for row_group in range(row_groups_per_file[relative_path]):
split_piece = compat_make_parquet_piece(piece.path, dataset.fs.open, row_group=row_group,
partition_keys=piece.partition_keys)
split_pieces.append(split_piece)
return split_pieces
def split_piece(piece):
metadata = compat_get_metadata(dataset.pieces[0], dataset.fs.open)
return [compat_make_parquet_piece(piece.path, dataset.fs.open,
row_group=row_group,
partition_keys=piece.partition_keys)
for row_group in range(metadata.num_row_groups)]