Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 16 additions & 81 deletions parquet/benches/arrow_reader_clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,66 +638,6 @@ fn find_file_if_exists(mut current_dir: PathBuf, file_name: &str) -> Option<Path
None
}

/// Represents a mapping from each column selected in the `ProjectionMask`
/// created from `filter_columns`, to the corresponding index in the list of
/// `filter_columns`?
///
/// # Example
///
/// If:
/// * the file schema has columns `[A, B, C]`
/// * `filter_columns` is `[C, A]`
/// * ==> `ProjectionMask` will be `[true, false, true]` = `[A, C]`
///
/// `FilterIndices` will be `[1, 0]`, because column `C` (index 0 in
/// filter_columns) is selected at index 1 of the `ProjectionMask` and column
/// `A` (index 1 in `filter_columns`) is selected at index 0 of the
/// `ProjectionMask`.
struct FilterIndices {
/// * index is offset in Query::filter_columns
/// * value is offset in column selected by filter ProjectionMask
inner: Vec<usize>,
}

impl FilterIndices {
/// Create a new `FilterIndices` from a list of column indices
///
/// Parameters:
/// * `schema_descriptor`: The schema of the file
/// * `filter_schema_indices`: a list of column indices in the schema
fn new(schema_descriptor: &SchemaDescriptor, filter_schema_indices: Vec<usize>) -> Self {
for &filter_index in &filter_schema_indices {
assert!(filter_index < schema_descriptor.num_columns());
}
// When the columns are selected using a ProjectionMask, they are
// returned in the order of the schema (not the order they were specified)
//
// So if the original schema indices are 5, 1, 3 (select the sixth and
// second and fourth column), the RecordBatch returned will select them
// in order 1, 3, 5,
//
// Thus we need a map to convert back to the original selection order
// `[1, 2, 0]`
let mut reordered: Vec<_> = filter_schema_indices.iter().enumerate().collect();
reordered.sort_by_key(|(_projection_idx, original_schema_idx)| **original_schema_idx);
let mut inner = vec![0; reordered.len()];
for (output_idx, (projection_idx, _original_schema_idx)) in
reordered.into_iter().enumerate()
{
inner[projection_idx] = output_idx;
}
Self { inner }
}

/// Given the index of a column in `filter_columns`, return the index of the
/// column in the columns selected from `ProjectionMask`
fn map_column(&self, filter_columns_index: usize) -> usize {
// The selection index is the index in the filter mask
// The inner index is the index in the filter columns
self.inner[filter_columns_index]
}
}

/// Encapsulates the test parameters for a single benchmark
struct ReadTest {
/// Human identifiable name
Expand All @@ -706,10 +646,8 @@ struct ReadTest {
arrow_reader_metadata: ArrowReaderMetadata,
/// Which columns in the file should be projected (decoded after filter)?
projection_mask: ProjectionMask,
/// Which columns in the file should be passed to the filter?
filter_mask: ProjectionMask,
/// Mapping from column selected in filter mask to `Query::filter_columns`
filter_indices: FilterIndices,
/// Schema indices for each filter column (in filter_columns order)
filter_schema_indices: Vec<usize>,
/// Predicates to apply
predicates: Vec<ClickBenchPredicate>,
/// How many rows are expected to pass the predicate?
Expand Down Expand Up @@ -744,16 +682,12 @@ impl ReadTest {
};

let filter_schema_indices = column_indices(schema_descr, &filter_columns);
let filter_mask =
ProjectionMask::leaves(schema_descr, filter_schema_indices.iter().cloned());
let filter_indices = FilterIndices::new(schema_descr, filter_schema_indices);

Self {
name,
arrow_reader_metadata,
projection_mask,
filter_mask,
filter_indices,
filter_schema_indices,
predicates,
expected_row_count,
}
Expand Down Expand Up @@ -851,25 +785,26 @@ impl ReadTest {

/// Return a `RowFilter` to apply to the reader.
///
/// Note that since `RowFilter` does not implement Clone, we need to create
/// the filter for each row
/// Each predicate gets a ProjectionMask containing only the single column
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// it needs, rather than all filter columns. This avoids decoding expensive
/// columns (e.g. strings) when evaluating cheap predicates (e.g. integer equality).
fn row_filter(&self) -> RowFilter {
// Note: The predicates are in terms columns in the filter mask
// but the record batch passed back has columns in the order of the file
// schema
let schema_descr = self
.arrow_reader_metadata
.metadata()
.file_metadata()
.schema_descr();

// Convert the predicates to ArrowPredicateFn to conform to the RowFilter API
let arrow_predicates: Vec<_> = self
.predicates
.iter()
.map(|pred| {
let orig_column_index = pred.column_index();
let column_index = self.filter_indices.map_column(orig_column_index);
let schema_index = self.filter_schema_indices[pred.column_index()];
let predicate_mask = ProjectionMask::leaves(schema_descr, [schema_index]);
let mut predicate_fn = pred.predicate_fn();
Box::new(ArrowPredicateFn::new(
self.filter_mask.clone(),
move |batch| (predicate_fn)(batch.column(column_index)),
)) as Box<dyn ArrowPredicate>
Box::new(ArrowPredicateFn::new(predicate_mask, move |batch| {
(predicate_fn)(batch.column(0))
})) as Box<dyn ArrowPredicate>
})
.collect();

Expand Down
Loading