diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index 8635a5955715..5a6fb36d5800 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -638,66 +638,6 @@ fn find_file_if_exists(mut current_dir: PathBuf, file_name: &str) -> Option `ProjectionMask` will be `[true, false, true]` = `[A, C]` -/// -/// `FilterIndices` will be `[1, 0]`, because column `C` (index 0 in -/// filter_columns) is selected at index 1 of the `ProjectionMask` and column -/// `A` (index 1 in `filter_columns`) is selected at index 0 of the -/// `ProjectionMask`. -struct FilterIndices { - /// * index is offset in Query::filter_columns - /// * value is offset in column selected by filter ProjectionMask - inner: Vec, -} - -impl FilterIndices { - /// Create a new `FilterIndices` from a list of column indices - /// - /// Parameters: - /// * `schema_descriptor`: The schema of the file - /// * `filter_schema_indices`: a list of column indices in the schema - fn new(schema_descriptor: &SchemaDescriptor, filter_schema_indices: Vec) -> Self { - for &filter_index in &filter_schema_indices { - assert!(filter_index < schema_descriptor.num_columns()); - } - // When the columns are selected using a ProjectionMask, they are - // returned in the order of the schema (not the order they were specified) - // - // So if the original schema indices are 5, 1, 3 (select the sixth and - // second and fourth column), the RecordBatch returned will select them - // in order 1, 3, 5, - // - // Thus we need a map to convert back to the original selection order - // `[1, 2, 0]` - let mut reordered: Vec<_> = filter_schema_indices.iter().enumerate().collect(); - reordered.sort_by_key(|(_projection_idx, original_schema_idx)| **original_schema_idx); - let mut inner = vec![0; reordered.len()]; - for (output_idx, (projection_idx, _original_schema_idx)) in - reordered.into_iter().enumerate() - { - inner[projection_idx] = output_idx; - } - Self { inner } - } - - /// Given the index of a column in `filter_columns`, return the index of the - /// column in the columns selected from `ProjectionMask` - fn map_column(&self, filter_columns_index: usize) -> usize { - // The selection index is the index in the filter mask - // The inner index is the index in the filter columns - self.inner[filter_columns_index] - } -} - /// Encapsulates the test parameters for a single benchmark struct ReadTest { /// Human identifiable name @@ -706,10 +646,8 @@ struct ReadTest { arrow_reader_metadata: ArrowReaderMetadata, /// Which columns in the file should be projected (decoded after filter)? projection_mask: ProjectionMask, - /// Which columns in the file should be passed to the filter? - filter_mask: ProjectionMask, - /// Mapping from column selected in filter mask to `Query::filter_columns` - filter_indices: FilterIndices, + /// Schema indices for each filter column (in filter_columns order) + filter_schema_indices: Vec, /// Predicates to apply predicates: Vec, /// How many rows are expected to pass the predicate? @@ -744,16 +682,12 @@ impl ReadTest { }; let filter_schema_indices = column_indices(schema_descr, &filter_columns); - let filter_mask = - ProjectionMask::leaves(schema_descr, filter_schema_indices.iter().cloned()); - let filter_indices = FilterIndices::new(schema_descr, filter_schema_indices); Self { name, arrow_reader_metadata, projection_mask, - filter_mask, - filter_indices, + filter_schema_indices, predicates, expected_row_count, } @@ -851,25 +785,26 @@ impl ReadTest { /// Return a `RowFilter` to apply to the reader. /// - /// Note that since `RowFilter` does not implement Clone, we need to create - /// the filter for each row + /// Each predicate gets a ProjectionMask containing only the single column + /// it needs, rather than all filter columns. This avoids decoding expensive + /// columns (e.g. strings) when evaluating cheap predicates (e.g. integer equality). fn row_filter(&self) -> RowFilter { - // Note: The predicates are in terms columns in the filter mask - // but the record batch passed back has columns in the order of the file - // schema + let schema_descr = self + .arrow_reader_metadata + .metadata() + .file_metadata() + .schema_descr(); - // Convert the predicates to ArrowPredicateFn to conform to the RowFilter API let arrow_predicates: Vec<_> = self .predicates .iter() .map(|pred| { - let orig_column_index = pred.column_index(); - let column_index = self.filter_indices.map_column(orig_column_index); + let schema_index = self.filter_schema_indices[pred.column_index()]; + let predicate_mask = ProjectionMask::leaves(schema_descr, [schema_index]); let mut predicate_fn = pred.predicate_fn(); - Box::new(ArrowPredicateFn::new( - self.filter_mask.clone(), - move |batch| (predicate_fn)(batch.column(column_index)), - )) as Box + Box::new(ArrowPredicateFn::new(predicate_mask, move |batch| { + (predicate_fn)(batch.column(0)) + })) as Box }) .collect();