Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,46 +126,50 @@ pub(crate) fn build_page_filter(
let mut row_selections = VecDeque::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// `extract_page_index_push_down_predicates` only return predicate with one col.
let col_id = *predicate.need_input_columns_ids().iter().next().unwrap();
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
(rg_page_indexes, rg_offset_indexes)
{
selectors.extend(
prune_pages_in_one_row_group(
&groups[*r],
&predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
"Fail in prune_pages_in_one_row_group: {}",
e
))
}),
);
} else {
trace!(
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can support rewrite abs(i) = 1 to i = 1 or i = -1🤔 further.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File #4388

// will be rewrite to `lit(true)`, so may have an empty required_columns.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the why it fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for these comments, BTW -- they help both to review the code as well as read it in the future 👍

if let Some(&col_id) = predicate.need_input_columns_ids().iter().next() {
let mut selectors = Vec::with_capacity(row_groups.len());
for r in row_groups.iter() {
let rg_offset_indexes = file_offset_indexes.get(*r);
let rg_page_indexes = file_page_indexes.get(*r);
if let (Some(rg_page_indexes), Some(rg_offset_indexes)) =
(rg_page_indexes, rg_offset_indexes)
{
selectors.extend(
prune_pages_in_one_row_group(
&groups[*r],
&predicate,
rg_offset_indexes.get(col_id),
rg_page_indexes.get(col_id),
groups[*r].column(col_id).column_descr(),
file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
"Fail in prune_pages_in_one_row_group: {}",
e
))
}),
);
} else {
trace!(
"Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
// fallback select all rows
let all_selected =
vec![RowSelector::select(groups[*r].num_rows() as usize)];
selectors.push(all_selected);
}
}
}
debug!(
debug!(
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
);
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
row_selections
.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
}
}
let final_selection = combine_multi_col_selection(row_selections);
let total_skip =
Expand Down
10 changes: 1 addition & 9 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ async fn prune_int32_eq() {
.await;
}
#[tokio::test]
#[ignore]
async fn prune_int32_scalar_fun_and_eq() {
test_prune(
Scenario::Int32,
Expand All @@ -431,7 +430,6 @@ async fn prune_int32_scalar_fun_and_eq() {
}

#[tokio::test]
#[ignore]
async fn prune_int32_scalar_fun() {
test_prune(
Scenario::Int32,
Expand All @@ -444,7 +442,6 @@ async fn prune_int32_scalar_fun() {
}

#[tokio::test]
#[ignore]
async fn prune_int32_complex_expr() {
test_prune(
Scenario::Int32,
Expand All @@ -457,7 +454,6 @@ async fn prune_int32_complex_expr() {
}

#[tokio::test]
#[ignore]
async fn prune_int32_complex_expr_subtract() {
test_prune(
Scenario::Int32,
Expand Down Expand Up @@ -495,22 +491,20 @@ async fn prune_f64_lt() {
}

#[tokio::test]
#[ignore]
async fn prune_f64_scalar_fun_and_gt() {
// result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1"
// only use "f >= 0" to prune
test_prune(
Scenario::Float64,
"SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1",
Some(0),
Some(2),
Some(10),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only use "f >= 0" to prune it will prune two pages (each 5 rows), so make it 10 make sense

1,
)
.await;
}

#[tokio::test]
#[ignore]
async fn prune_f64_scalar_fun() {
// result of sql "SELECT * FROM t where abs(f-1) <= 0.000001" is not supported
test_prune(
Expand All @@ -524,7 +518,6 @@ async fn prune_f64_scalar_fun() {
}

#[tokio::test]
#[ignore]
async fn prune_f64_complex_expr() {
// result of sql "SELECT * FROM t where f+1 > 1.1"" is not supported
test_prune(
Expand All @@ -538,7 +531,6 @@ async fn prune_f64_complex_expr() {
}

#[tokio::test]
#[ignore]
async fn prune_f64_complex_expr_subtract() {
// result of sql "SELECT * FROM t where 1-f > 1" is not supported
test_prune(
Expand Down