Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 9 additions & 45 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ use crate::{
};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::{
aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
Expand Down Expand Up @@ -645,39 +643,6 @@ impl LogicalPlan {
Ok(LogicalPlan::Values(Values { schema, values }))
}
LogicalPlan::Filter(Filter { predicate, input }) => {
// todo: should this logic be moved to Filter::try_new?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR completes this TODO, per @jonahgao 's suggestion


// filter predicates should not contain aliased expressions so we remove any aliases
// before this logic was added we would have aliases within filters such as for
// benchmark q6:
//
// lineitem.l_shipdate >= Date32(\"8766\")
// AND lineitem.l_shipdate < Date32(\"9131\")
// AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >=
// Decimal128(Some(49999999999999),30,15)
// AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <=
// Decimal128(Some(69999999999999),30,15)
// AND lineitem.l_quantity < Decimal128(Some(2400),15,2)

let predicate = predicate
.transform_down(|expr| {
match expr {
Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::InSubquery(_) => {
// subqueries could contain aliases so we don't recurse into those
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
}
Expr::Alias(_) => Ok(Transformed::new(
expr.unalias(),
true,
TreeNodeRecursion::Jump,
)),
_ => Ok(Transformed::no(expr)),
}
})
.data()?;

Filter::try_new(predicate, input).map(LogicalPlan::Filter)
}
LogicalPlan::Repartition(_) => Ok(self),
Expand Down Expand Up @@ -878,7 +843,7 @@ impl LogicalPlan {
}
LogicalPlan::Filter { .. } => {
assert_eq!(1, expr.len());
let predicate = expr.pop().unwrap().unalias_nested().data;
let predicate = expr.pop().unwrap();

Filter::try_new(predicate, Arc::new(inputs.swap_remove(0)))
.map(LogicalPlan::Filter)
Expand Down Expand Up @@ -2117,6 +2082,9 @@ pub struct Filter {

impl Filter {
/// Create a new filter operator.
///
/// Notes: as Aliases have no effect on the output of a filter operator,
/// they are removed from the predicate expression.
pub fn try_new(predicate: Expr, input: Arc<LogicalPlan>) -> Result<Self> {
// Filter predicates must return a boolean value so we try and validate that here.
// Note that it is not always possible to resolve the predicate expression during plan
Expand Down Expand Up @@ -2940,7 +2908,7 @@ mod tests {
use crate::logical_plan::table_scan;
use crate::{col, exists, in_subquery, lit, placeholder, GroupingSet};

use datafusion_common::tree_node::TreeNodeVisitor;
use datafusion_common::tree_node::{TransformedResult, TreeNodeVisitor};
use datafusion_common::{not_impl_err, Constraint, ScalarValue};

use crate::test::function_stub::count;
Expand Down Expand Up @@ -3500,11 +3468,8 @@ digraph {
}));
let col = schema.field_names()[0].clone();

let filter = Filter::try_new(
Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)))),
scan,
)
.unwrap();
let filter =
Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
assert!(filter.is_scalar());
}

Expand All @@ -3522,8 +3487,7 @@ digraph {
.build()
.unwrap();

let external_filter =
col("foo").eq(Expr::Literal(ScalarValue::Boolean(Some(true))));
let external_filter = col("foo").eq(lit(true));

// after transformation, because plan is not the same anymore,
// the parent plan is built again with call to LogicalPlan::with_new_inputs -> with_new_exprs
Expand Down
11 changes: 2 additions & 9 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,16 +342,9 @@ impl CommonSubexprEliminate {
let input = unwrap_arc(input);
let expr = vec![predicate];
self.try_unary_plan(expr, input, config)?
.transform_data(|(mut new_expr, new_input)| {
.map_data(|(mut new_expr, new_input)| {
assert_eq!(new_expr.len(), 1); // passed in vec![predicate]
let new_predicate = new_expr
.pop()
.unwrap()
.unalias_nested()
.update_data(|new_predicate| (new_predicate, new_input));
Ok(new_predicate)
})?
.map_data(|(new_predicate, new_input)| {
let new_predicate = new_expr.pop().unwrap();
Filter::try_new(new_predicate, Arc::new(new_input))
.map(LogicalPlan::Filter)
})
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,11 +761,11 @@ impl OptimizerRule for PushDownFilter {

// Push down non-unnest filter predicate
// Unnest
// Unenst Input (Projection)
// Unnest Input (Projection)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive by typo fixup

// -> rewritten to
// Unnest
// Filter
// Unenst Input (Projection)
// Unnest Input (Projection)

let unnest_input = std::mem::take(&mut unnest.input);

Expand Down