Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,16 @@ mod tests {
run_query(14).await
}

#[tokio::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

async fn run_q16() -> Result<()> {
run_query(16).await
}

#[tokio::test]
async fn run_q18() -> Result<()> {
run_query(18).await
}

#[tokio::test]
async fn run_q19() -> Result<()> {
run_query(19).await
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::simplify_expressions::SimplifyExpressions;
use crate::optimizer::single_distinct_to_groupby::SingleDistinctToGroupBy;
use crate::optimizer::subquery_filter_to_join::SubqueryFilterToJoin;

use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
Expand Down Expand Up @@ -1199,6 +1200,7 @@ impl SessionState {
// Simplify expressions first to maximize the chance
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
Arc::new(SubqueryFilterToJoin::new()),
Arc::new(EliminateFilter::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Expand Down
66 changes: 16 additions & 50 deletions datafusion/core/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,17 @@

//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan

use crate::error::Result;
use crate::execution::context::ExecutionProps;
use crate::logical_expr::TableProviderFilterPushDown;
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
use crate::logical_plan::{
and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use crate::{error::Result, logical_plan::Operator};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use std::collections::{HashMap, HashSet};

/// Filter Push Down optimizer rule pushes filter clauses down the plan
/// # Introduction
Expand Down Expand Up @@ -95,23 +92,6 @@ fn push_down(state: &State, plan: &LogicalPlan) -> Result<LogicalPlan> {
utils::from_plan(plan, &expr, &new_inputs)
}

/// returns a new [LogicalPlan] that wraps `plan` in a [LogicalPlan::Filter] with
/// its predicate be all `predicates` ANDed.
fn add_filter(plan: LogicalPlan, predicates: &[&Expr]) -> LogicalPlan {
// reduce filters to a single filter with an AND
let predicate = predicates
.iter()
.skip(1)
.fold(predicates[0].clone(), |acc, predicate| {
and(acc, (*predicate).to_owned())
});

LogicalPlan::Filter(Filter {
predicate,
input: Arc::new(plan),
})
}

// remove all filters from `filters` that are in `predicate_columns`
fn remove_filters(
filters: &[(Expr, HashSet<Column>)],
Expand Down Expand Up @@ -150,32 +130,14 @@ fn issue_filters(
return push_down(&state, plan);
}

let plan = add_filter(plan.clone(), &predicates);
let plan = utils::add_filter(plan.clone(), &predicates);

state.filters = remove_filters(&state.filters, &predicate_columns);

// continue optimization over all input nodes by cloning the current state (i.e. each node is independent)
push_down(&state, &plan)
}

/// converts "A AND B AND C" => [A, B, C]
fn split_members<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) {
match predicate {
Expr::BinaryExpr {
right,
op: Operator::And,
left,
} => {
split_members(left, predicates);
split_members(right, predicates);
}
Expr::Alias(expr, _) => {
split_members(expr, predicates);
}
other => predicates.push(other),
}
}

// For a given JOIN logical plan, determine whether each side of the join is preserved.
// We say a join side is preserved if the join returns all or a subset of the rows from
// the relevant side, such that each row of the output table directly maps to a row of
Expand Down Expand Up @@ -289,7 +251,7 @@ fn optimize_join(
Ok(plan)
} else {
// wrap the join on the filter whose predicates must be kept
let plan = add_filter(plan, &to_keep.0);
let plan = utils::add_filter(plan, &to_keep.0);
state.filters = remove_filters(&state.filters, &to_keep.1);

Ok(plan)
Expand All @@ -305,7 +267,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
LogicalPlan::Analyze { .. } => push_down(&state, plan),
LogicalPlan::Filter(Filter { input, predicate }) => {
let mut predicates = vec![];
split_members(predicate, &mut predicates);
utils::split_conjunction(predicate, &mut predicates);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


// Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.)
let mut no_col_predicates = vec![];
Expand All @@ -328,7 +290,10 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
// As those contain only literals, they could be optimized using constant folding
// and removal of WHERE TRUE / WHERE FALSE
if !no_col_predicates.is_empty() {
Ok(add_filter(optimize(input, state)?, &no_col_predicates))
Ok(utils::add_filter(
optimize(input, state)?,
&no_col_predicates,
))
} else {
optimize(input, state)
}
Expand Down Expand Up @@ -592,17 +557,18 @@ fn rewrite(expr: &Expr, projection: &HashMap<String, Expr>) -> Result<Expr> {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use crate::datasource::TableProvider;
use crate::logical_plan::plan::provider_as_source;
use crate::logical_plan::{
lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder, Operator,
and, col, lit, sum, union_with_alias, DFSchema, Expr, LogicalPlanBuilder,
Operator,
};
use crate::physical_plan::ExecutionPlan;
use crate::prelude::JoinType;
use crate::test::*;
use crate::{
logical_plan::{col, plan::provider_as_source},
prelude::JoinType,
};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ pub mod optimizer;
pub mod projection_push_down;
pub mod simplify_expressions;
pub mod single_distinct_to_groupby;
pub mod subquery_filter_to_join;
pub mod utils;
Loading