Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
# Resolver in executor context will get hadoop config from environment
resolver = FilesystemResolver(dataset_url, hdfs_driver=hdfs_driver)
fs = resolver.filesystem()
# Create pyarrow piece
piece = compat_make_parquet_piece(piece_info.path, fs.open, row_group=piece_info.row_group,
partition_keys=piece_info.partition_keys)
# Collect column names needed for indexing
column_names = set()
for indexer in indexers:
column_names.update(indexer.column_names)
# Read columns needed for indexing
column_rows = compat_piece_read(piece, fs.open, columns=list(column_names),
partitions=partitions).to_pandas().to_dict('records')
# Decode columns values
decoded_rows = [utils.decode_row(row, schema) for row in column_rows]
if not decoded_rows:
raise ValueError('Cannot build index with empty decoded_rows, columns: {}, partitions: {}'
.format(column_names, partitions))
# Index columns values
for indexer in indexers:
indexer.build_index(decoded_rows, piece_info.piece_index)
# Indexer objects contain index data, it will be consolidated on reduce phace
return indexers
def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_drop_partition):
# If integer_object_nulls is set to False, nullable integer fields are return as floats
# with nulls translated to nans
data_frame = compat_piece_read(piece, lambda _: pq_file, columns=column_names,
partitions=self._dataset.partitions).to_pandas(integer_object_nulls=True)
num_rows = len(data_frame)
num_partitions = shuffle_row_drop_partition[1]
this_partition = shuffle_row_drop_partition[0]
partition_indexes = np.floor(np.arange(num_rows) / (float(num_rows) / min(num_rows, num_partitions)))
if self._ngram:
# If we have an ngram we need to take elements from the next partition to build the sequence
next_partition_indexes = np.where(partition_indexes >= this_partition + 1)
if next_partition_indexes[0].size:
next_partition_to_add = next_partition_indexes[0][0:self._ngram.length - 1]
partition_indexes[next_partition_to_add] = this_partition
selected_dataframe = data_frame.loc[partition_indexes == this_partition]
def _read_with_shuffle_row_drop(self, piece, pq_file, column_names, shuffle_row_drop_partition):
table = compat_piece_read(piece, lambda _: pq_file, columns=column_names, partitions=self._dataset.partitions)
num_rows = len(table)
num_partitions = shuffle_row_drop_partition[1]
this_partition = shuffle_row_drop_partition[0]
if num_partitions > 1:
data_frame_pandas = table.to_pandas()
partition_indexes = np.floor(np.arange(num_rows) / (float(num_rows) / min(num_rows, num_partitions)))
table = pa.Table.from_pandas(data_frame_pandas.loc[partition_indexes == this_partition],
preserve_index=False)
return table