Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
base_path = dataset.paths
def get_row_group_info(path):
fs = filesystem_factory()
relative_path = os.path.relpath(path, base_path)
pq_file = fs.open(path)
num_row_groups = pq.read_metadata(pq_file).num_row_groups
pq_file.close()
return relative_path, num_row_groups
row_groups = spark_context.parallelize(paths, len(paths)) \
.map(get_row_group_info) \
.collect()
num_row_groups_str = json.dumps(dict(row_groups))
# Add the dict for the number of row groups in each file to the parquet file metadata footer
utils.add_to_dataset_metadata(dataset, ROW_GROUPS_PER_FILE_KEY, num_row_groups_str)
for piece_index in range(pieces_num):
# indexes relies on the ordering of the split dataset pieces.
# This relies on how the dataset pieces are split and sorted which although should not change,
# still might and we should make sure not to forget that could break this.
piece = split_pieces[piece_index]
piece_info_list.append(PieceInfo(piece_index, piece.path, piece.row_group, piece.partition_keys))
start_time = time.time()
piece_info_rdd = spark_context.parallelize(piece_info_list, min(len(piece_info_list), PARALLEL_SLICE_NUM))
indexer_rdd = piece_info_rdd.map(lambda piece_info: _index_columns(piece_info, dataset_url, partitions,
indexers, schema, hdfs_driver=hdfs_driver))
indexer_list = indexer_rdd.reduce(_combine_indexers)
indexer_dict = {indexer.index_name: indexer for indexer in indexer_list}
serialized_indexers = pickle.dumps(indexer_dict, pickle.HIGHEST_PROTOCOL)
utils.add_to_dataset_metadata(dataset, ROWGROUPS_INDEX_KEY, serialized_indexers)
logger.info("Elapsed time of index creation: %f s", (time.time() - start_time))
# Create pyarrow piece
piece = compat_make_parquet_piece(piece_info.path, fs.open, row_group=piece_info.row_group,
partition_keys=piece_info.partition_keys)
# Collect column names needed for indexing
column_names = set()
for indexer in indexers:
column_names.update(indexer.column_names)
# Read columns needed for indexing
column_rows = compat_piece_read(piece, fs.open, columns=list(column_names),
partitions=partitions).to_pandas().to_dict('records')
# Decode columns values
decoded_rows = [utils.decode_row(row, schema) for row in column_rows]
if not decoded_rows:
raise ValueError('Cannot build index with empty decoded_rows, columns: {}, partitions: {}'
.format(column_names, partitions))
# Index columns values
for indexer in indexers:
indexer.build_index(decoded_rows, piece_info.piece_index)
# Indexer objects contain index data, it will be consolidated on reduce phace
return indexers
def _load_rows(self, pq_file, piece, shuffle_row_drop_range):
"""Loads all rows from a piece"""
# pyarrow would fail if we request a column names that the dataset is partitioned by, so we strip them from
# the `columns` argument.
partitions = self._dataset.partitions
column_names = set(field.name for field in self._schema.fields.values()) - partitions.partition_names
all_rows = self._read_with_shuffle_row_drop(piece, pq_file, column_names, shuffle_row_drop_range)
all_rows = [utils.decode_row(row, self._schema) for row in all_rows]
if self._transform_spec:
all_rows = _apply_transform_spec(all_rows, self._transform_spec)
return all_rows
.map(lambda row: utils.decode_row(row.asDict(), schema)) \
.map(lambda record: schema.make_namedtuple(**record))
return []
# Remove rows that were filtered out by the predicate
filtered_decoded_predicate_rows = [row for i, row in enumerate(decoded_predicate_rows) if
match_predicate_mask[i]]
if other_column_names:
# Read remaining columns
other_rows = self._read_with_shuffle_row_drop(piece, pq_file, other_column_names,
shuffle_row_drop_partition)
# Remove rows that were filtered out by the predicate
filtered_other_rows = [row for i, row in enumerate(other_rows) if match_predicate_mask[i]]
# Decode remaining columns
decoded_other_rows = [utils.decode_row(row, self._schema) for row in filtered_other_rows]
# Merge predicate needed columns with the remaining
all_cols = [_merge_two_dicts(a, b) for a, b in zip(decoded_other_rows, filtered_decoded_predicate_rows)]
result = all_cols
else:
result = filtered_decoded_predicate_rows
if self._transform_spec:
_apply_transform_spec(result, self._transform_spec)
return result