Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
invalid_column_names = predicate_column_names - all_schema_names
if invalid_column_names:
raise ValueError('At least some column names requested by the predicate ({}) '
'are not valid schema names: ({})'.format(', '.join(invalid_column_names),
', '.join(all_schema_names)))
other_column_names = all_schema_names - predicate_column_names - self._dataset.partitions.partition_names
# Read columns needed for the predicate
predicate_rows = self._read_with_shuffle_row_drop(piece, pq_file, predicate_column_names,
shuffle_row_drop_partition)
# Decode values
decoded_predicate_rows = [
utils.decode_row(_select_cols(row, predicate_column_names), self._schema)
for row in predicate_rows]
# Use the predicate to filter
match_predicate_mask = [worker_predicate.do_include(row) for row in decoded_predicate_rows]
# Don't have anything left after filtering? Exit early.
if not any(match_predicate_mask):
return []
# Remove rows that were filtered out by the predicate
filtered_decoded_predicate_rows = [row for i, row in enumerate(decoded_predicate_rows) if
match_predicate_mask[i]]
if other_column_names:
# Read remaining columns
other_rows = self._read_with_shuffle_row_drop(piece, pq_file, other_column_names,