From 27069f0f6fd19a53c71d8842c6a2dabccbeeba8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 27 Sep 2023 15:03:53 +0200 Subject: [PATCH] Don't add filters to used columns Add test WIP fix Fix filter after scan Totally reemove filter to column extraction Fix test Update tests 1 Update tests 2 Update tests 3 --- datafusion/core/src/dataframe.rs | 9 +- datafusion/core/src/datasource/view.rs | 27 +-- datafusion/expr/src/logical_plan/builder.rs | 11 +- datafusion/optimizer/src/push_down_filter.rs | 175 ++++++------------ .../optimizer/src/push_down_projection.rs | 34 +++- .../simplify_expressions/simplify_exprs.rs | 3 +- .../src/unwrap_cast_in_comparison.rs | 8 +- 7 files changed, 127 insertions(+), 140 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 2967b1823f374..640f57f3d5fc2 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -1287,15 +1287,16 @@ impl TableProvider for DataFrameTableProvider { limit: Option, ) -> Result> { let mut expr = LogicalPlanBuilder::from(self.plan.clone()); - if let Some(p) = projection { - expr = expr.select(p.iter().copied())? - } - // Add filter when given let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); if let Some(filter) = filter { expr = expr.filter(filter)? } + + if let Some(p) = projection { + expr = expr.select(p.iter().copied())? + } + // add a limit if given if let Some(l) = limit { expr = expr.limit(0, Some(l))? diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 210acad18c930..d58284d1bac50 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -108,12 +108,20 @@ impl TableProvider for ViewTable { filters: &[Expr], limit: Option, ) -> Result> { - let plan = if let Some(projection) = projection { + let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); + let plan = self.logical_plan().clone(); + let mut plan = LogicalPlanBuilder::from(plan); + + if let Some(filter) = filter { + plan = plan.filter(filter)?; + } + + let mut plan = if let Some(projection) = projection { // avoiding adding a redundant projection (e.g. SELECT * FROM view) let current_projection = - (0..self.logical_plan.schema().fields().len()).collect::>(); + (0..plan.schema().fields().len()).collect::>(); if projection == ¤t_projection { - self.logical_plan().clone() + plan } else { let fields: Vec = projection .iter() @@ -123,19 +131,11 @@ impl TableProvider for ViewTable { ) }) .collect(); - LogicalPlanBuilder::from(self.logical_plan.clone()) - .project(fields)? - .build()? + plan.project(fields)? } } else { - self.logical_plan().clone() + plan }; - let mut plan = LogicalPlanBuilder::from(plan); - let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new)); - - if let Some(filter) = filter { - plan = plan.filter(filter)?; - } if let Some(limit) = limit { plan = plan.limit(0, Some(limit))?; @@ -439,6 +439,7 @@ mod tests { .select_columns(&["bool_col", "int_col"])?; let plan = df.explain(false, false)?.collect().await?; + // Filters all the way to Parquet let formatted = arrow::util::pretty::pretty_format_batches(&plan) .unwrap() diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index eb2123b7c27af..6171d43b37f50 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -26,7 +26,9 @@ use crate::expr_rewriter::{ }; use crate::type_coercion::binary::comparison_coercion; use crate::utils::{columnize_expr, compare_sort_expr}; -use crate::{and, binary_expr, DmlStatement, Operator, WriteOp}; +use crate::{ + and, binary_expr, DmlStatement, Operator, TableProviderFilterPushDown, WriteOp, +}; use crate::{ logical_plan::{ Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join, @@ -1402,6 +1404,13 @@ impl TableSource for LogicalTableSource { fn schema(&self) -> SchemaRef { self.table_schema.clone() } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) + } } /// Create a [`LogicalPlan::Unnest`] plan diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 571e2146c4e04..fe726d5d77830 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -944,8 +944,7 @@ mod tests { // filter is before projection let expected = "\ Projection: test.a, test.b\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -972,9 +971,7 @@ mod tests { let plan = LogicalPlanBuilder::from(table_scan) .filter(lit(0i64).eq(lit(1i64)))? .build()?; - let expected = "\ - Filter: Int64(0) = Int64(1)\ - \n TableScan: test"; + let expected = "TableScan: test, full_filters=[Int64(0) = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -990,8 +987,7 @@ mod tests { let expected = "\ Projection: test.c, test.b\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1005,8 +1001,7 @@ mod tests { // filter of key aggregation is commutative let expected = "\ Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b) AS total_salary]]\ - \n Filter: test.a > Int64(10)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a > Int64(10)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1031,8 +1026,7 @@ mod tests { .build()?; let expected = "Aggregate: groupBy=[[test.b + test.a]], aggr=[[SUM(test.a), test.b]]\ - \n Filter: test.b + test.a > Int64(10)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.b + test.a > Int64(10)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1062,8 +1056,7 @@ mod tests { // filter is before projection let expected = "\ Projection: test.a AS b, test.c\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1107,8 +1100,7 @@ mod tests { // filter is before projection let expected = "\ Projection: test.a * Int32(2) + test.c AS b, test.c\ - \n Filter: test.a * Int32(2) + test.c = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a * Int32(2) + test.c = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1140,8 +1132,7 @@ mod tests { let expected = "\ Projection: b * Int32(3) AS a, test.c\ \n Projection: test.a * Int32(2) + test.c AS b, test.c\ - \n Filter: (test.a * Int32(2) + test.c) * Int32(3) = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[(test.a * Int32(2) + test.c) * Int32(3) = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1204,8 +1195,7 @@ mod tests { // Push filter below NoopPlan let expected = "\ NoopPlan\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected)?; let custom_plan = LogicalPlan::Extension(Extension { @@ -1222,8 +1212,7 @@ mod tests { let expected = "\ Filter: test.c = Int64(2)\ \n NoopPlan\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected)?; let custom_plan = LogicalPlan::Extension(Extension { @@ -1239,10 +1228,8 @@ mod tests { // Push filter below NoopPlan for each child branch let expected = "\ NoopPlan\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]\ + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected)?; let custom_plan = LogicalPlan::Extension(Extension { @@ -1259,10 +1246,8 @@ mod tests { let expected = "\ Filter: test.c = Int64(2)\ \n NoopPlan\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]\ + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1295,8 +1280,7 @@ mod tests { Filter: SUM(test.c) > Int64(10)\ \n Aggregate: groupBy=[[b]], aggr=[[SUM(test.c)]]\ \n Projection: test.a AS b, test.c\ - \n Filter: test.a > Int64(10)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a > Int64(10)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1330,8 +1314,7 @@ mod tests { Filter: SUM(test.c) > Int64(10) AND SUM(test.c) < Int64(20)\ \n Aggregate: groupBy=[[b]], aggr=[[SUM(test.c)]]\ \n Projection: test.a AS b, test.c\ - \n Filter: test.a > Int64(10)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a > Int64(10)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1367,10 +1350,8 @@ mod tests { .build()?; // filter appears below Union let expected = "Union\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test\ - \n Filter: test2.a = Int64(1)\ - \n TableScan: test2"; + \n TableScan: test, full_filters=[test.a = Int64(1)]\ + \n TableScan: test2, full_filters=[test2.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1390,12 +1371,10 @@ mod tests { // filter appears below Union let expected = "Union\n SubqueryAlias: test2\ \n Projection: test.a AS b\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a = Int64(1)]\ \n SubqueryAlias: test2\ \n Projection: test.a AS b\ - \n Filter: test.a = Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a = Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1423,11 +1402,9 @@ mod tests { let expected = "Projection: test.a, test1.d\ \n CrossJoin:\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.a = Int32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a = Int32(1)]\ \n Projection: test1.d, test1.e, test1.f\ - \n Filter: test1.d > Int32(2)\ - \n TableScan: test1"; + \n TableScan: test1, full_filters=[test1.d > Int32(2)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1452,11 +1429,9 @@ mod tests { let expected = "Projection: test.a, test1.a\ \n CrossJoin:\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.a = Int32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a = Int32(1)]\ \n Projection: test1.a, test1.b, test1.c\ - \n Filter: test1.a > Int32(2)\ - \n TableScan: test1"; + \n TableScan: test1, full_filters=[test1.a > Int32(2)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1489,8 +1464,7 @@ mod tests { \n Filter: test.a >= Int64(1)\ \n Limit: skip=0, fetch=1\ \n Projection: test.a\ - \n Filter: test.a <= Int64(1)\ - \n TableScan: test"; + \n TableScan: test, full_filters=[test.a <= Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1544,6 +1518,10 @@ mod tests { // not part of the test assert_eq!(format!("{plan:?}"), expected); + let expected = "\ + TestUserDefined\ + \n TableScan: test, full_filters=[test.a <= Int64(1)]"; + assert_optimized_plan_eq(&plan, expected) } @@ -1579,11 +1557,9 @@ mod tests { // filter sent to side before the join let expected = "\ Inner Join: test.a = test2.a\ - \n Filter: test.a <= Int64(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a <= Int64(1)]\ \n Projection: test2.a\ - \n Filter: test2.a <= Int64(1)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.a <= Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1618,11 +1594,9 @@ mod tests { // filter sent to side before the join let expected = "\ Inner Join: Using test.a = test2.a\ - \n Filter: test.a <= Int64(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a <= Int64(1)]\ \n Projection: test2.a\ - \n Filter: test2.a <= Int64(1)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.a <= Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1704,8 +1678,7 @@ mod tests { let expected = "\ Inner Join: test.a = test2.a\ \n Projection: test.a, test.b\ - \n Filter: test.b <= Int64(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.b <= Int64(1)]\ \n Projection: test2.a, test2.c\ \n TableScan: test2"; assert_optimized_plan_eq(&plan, expected) @@ -1820,8 +1793,7 @@ mod tests { // filter sent to left side of the join, not the right let expected = "\ Left Join: Using test.a = test2.a\ - \n Filter: test.a <= Int64(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a <= Int64(1)]\ \n Projection: test2.a\ \n TableScan: test2"; assert_optimized_plan_eq(&plan, expected) @@ -1861,8 +1833,7 @@ mod tests { Right Join: Using test.a = test2.a\ \n TableScan: test\ \n Projection: test2.a\ - \n Filter: test2.a <= Int64(1)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.a <= Int64(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1903,11 +1874,9 @@ mod tests { let expected = "\ Inner Join: test.a = test2.a Filter: test.b < test2.b\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.c > UInt32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.c > UInt32(1)]\ \n Projection: test2.a, test2.b, test2.c\ - \n Filter: test2.c > UInt32(4)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.c > UInt32(4)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1947,11 +1916,9 @@ mod tests { let expected = "\ Inner Join: test.a = test2.a\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.b > UInt32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.b > UInt32(1)]\ \n Projection: test2.a, test2.b, test2.c\ - \n Filter: test2.c > UInt32(4)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.c > UInt32(4)]"; assert_optimized_plan_eq(&plan, expected) } @@ -1989,11 +1956,9 @@ mod tests { let expected = "\ Inner Join: test.a = test2.b\ \n Projection: test.a\ - \n Filter: test.a > UInt32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a > UInt32(1)]\ \n Projection: test2.b\ - \n Filter: test2.b > UInt32(1)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.b > UInt32(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2036,8 +2001,7 @@ mod tests { \n Projection: test.a, test.b, test.c\ \n TableScan: test\ \n Projection: test2.a, test2.b, test2.c\ - \n Filter: test2.c > UInt32(4)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.c > UInt32(4)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2078,8 +2042,7 @@ mod tests { let expected = "\ Right Join: test.a = test2.a Filter: test.b < test2.b AND test2.c > UInt32(4)\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.a > UInt32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a > UInt32(1)]\ \n Projection: test2.a, test2.b, test2.c\ \n TableScan: test2"; assert_optimized_plan_eq(&plan, expected) @@ -2305,8 +2268,7 @@ Projection: a, b // rewrite filter col b to test.a let expected = "\ Projection: test.a AS b, test.c\ - \n Filter: test.a > Int64(10) AND test.c > Int64(10)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a > Int64(10), test.c > Int64(10)]\ "; assert_optimized_plan_eq(&plan, expected) @@ -2338,8 +2300,7 @@ Projection: a, b let expected = "\ Projection: b, test.c\ \n Projection: test.a AS b, test.c\ - \n Filter: test.a > Int64(10) AND test.c > Int64(10)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a > Int64(10), test.c > Int64(10)]\ "; assert_optimized_plan_eq(&plan, expected) @@ -2365,9 +2326,7 @@ Projection: a, b // rewrite filter col b to test.a, col d to test.c let expected = "\ Projection: test.a AS b, test.c AS d\ - \n Filter: test.a > Int64(10) AND test.c > Int64(10)\ - \n TableScan: test\ - "; + \n TableScan: test, full_filters=[test.a > Int64(10), test.c > Int64(10)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2406,11 +2365,9 @@ Projection: a, b let expected = "\ Inner Join: c = d\ \n Projection: test.a AS c\ - \n Filter: test.a > UInt32(1)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.a > UInt32(1)]\ \n Projection: test2.b AS d\ - \n Filter: test2.b > UInt32(1)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.b > UInt32(1)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2438,9 +2395,7 @@ Projection: a, b // rewrite filter col b to test.a let expected = "\ Projection: test.a AS b, test.c\ - \n Filter: test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\ - \n TableScan: test\ - "; + \n TableScan: test, full_filters=[test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])]"; assert_optimized_plan_eq(&plan, expected) } @@ -2472,9 +2427,7 @@ Projection: a, b let expected = "\ Projection: b, test.c\ \n Projection: test.a AS b, test.c\ - \n Filter: test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])\ - \n TableScan: test\ - "; + \n TableScan: test, full_filters=[test.a IN ([UInt32(1), UInt32(2), UInt32(3), UInt32(4)])]"; assert_optimized_plan_eq(&plan, expected) } @@ -2508,11 +2461,10 @@ Projection: a, b // rewrite filter col b to test.a let expected_after = "\ Projection: test.a AS b, test.c\ - \n Filter: test.a IN ()\ + \n TableScan: test, full_filters=[test.a IN ()]\ \n Subquery:\ \n Projection: sq.c\ - \n TableScan: sq\ - \n TableScan: test"; + \n TableScan: sq"; assert_optimized_plan_eq(&plan, expected_after) } @@ -2573,8 +2525,7 @@ Projection: a, b Filter: test.a = d AND test.b > UInt32(1) OR test.b = e AND test.c < UInt32(10)\ \n CrossJoin:\ \n Projection: test.a, test.b, test.c\ - \n Filter: test.b > UInt32(1) OR test.c < UInt32(10)\ - \n TableScan: test\ + \n TableScan: test, full_filters=[test.b > UInt32(1) OR test.c < UInt32(10)]\ \n Projection: test1.a AS d, test1.a AS e\ \n TableScan: test1"; assert_optimized_plan_eq_with_rewrite_predicate(&plan, expected)?; @@ -2622,11 +2573,9 @@ Projection: a, b // Both side will be pushed down. let expected = "\ LeftSemi Join: test1.a = test2.a\ - \n Filter: test1.b > UInt32(1)\ - \n TableScan: test1\ + \n TableScan: test1, full_filters=[test1.b > UInt32(1)]\ \n Projection: test2.a, test2.b\ - \n Filter: test2.b > UInt32(2)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.b > UInt32(2)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2665,11 +2614,9 @@ Projection: a, b // Both side will be pushed down. let expected = "\ RightSemi Join: test1.a = test2.a\ - \n Filter: test1.b > UInt32(1)\ - \n TableScan: test1\ + \n TableScan: test1, full_filters=[test1.b > UInt32(1)]\ \n Projection: test2.a, test2.b\ - \n Filter: test2.b > UInt32(2)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.b > UInt32(2)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2715,8 +2662,7 @@ Projection: a, b \n Projection: test1.a, test1.b\ \n TableScan: test1\ \n Projection: test2.a, test2.b\ - \n Filter: test2.b > UInt32(2)\ - \n TableScan: test2"; + \n TableScan: test2, full_filters=[test2.b > UInt32(2)]"; assert_optimized_plan_eq(&plan, expected) } @@ -2759,8 +2705,7 @@ Projection: a, b // For right anti, filter of the left side can be pushed down. let expected = "RightAnti Join: test1.a = test2.a Filter: test2.b > UInt32(2)\ \n Projection: test1.a, test1.b\ - \n Filter: test1.b > UInt32(1)\ - \n TableScan: test1\ + \n TableScan: test1, full_filters=[test1.b > UInt32(1)]\ \n Projection: test2.a, test2.b\ \n TableScan: test2"; assert_optimized_plan_eq(&plan, expected) diff --git a/datafusion/optimizer/src/push_down_projection.rs b/datafusion/optimizer/src/push_down_projection.rs index 0469f678e09ef..6db4bb9ba405b 100644 --- a/datafusion/optimizer/src/push_down_projection.rs +++ b/datafusion/optimizer/src/push_down_projection.rs @@ -147,10 +147,6 @@ impl OptimizerRule for PushDownProjection { if !scan.projected_schema.fields().is_empty() => { let mut used_columns: HashSet = HashSet::new(); - // filter expr may not exist in expr in projection. - // like: TableScan: t1 projection=[bool_col, int_col], full_filters=[t1.id = Int32(1)] - // projection=[bool_col, int_col] don't contain `ti.id`. - exprlist_to_columns(&scan.filters, &mut used_columns)?; if projection_is_empty { used_columns .insert(scan.projected_schema.fields()[0].qualified_column()); @@ -568,6 +564,7 @@ mod tests { use crate::OptimizerContext; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::DFSchema; + use datafusion_expr::builder::table_scan_with_filters; use datafusion_expr::expr; use datafusion_expr::expr::Cast; use datafusion_expr::WindowFrame; @@ -977,6 +974,35 @@ mod tests { assert_optimized_plan_eq(&plan, expected) } + #[test] + fn table_full_filter_pushdown() -> Result<()> { + let schema = Schema::new(test_table_scan_fields()); + + let table_scan = table_scan_with_filters( + Some("test"), + &schema, + None, + vec![col("b").eq(lit(1))], + )? + .build()?; + assert_eq!(3, table_scan.schema().fields().len()); + assert_fields_eq(&table_scan, vec!["a", "b", "c"]); + + // there is no need for the first projection + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("b")])? + .project(vec![lit(1).alias("a")])? + .build()?; + + assert_fields_eq(&plan, vec!["a"]); + + let expected = "\ + Projection: Int32(1) AS a\ + \n TableScan: test projection=[a], full_filters=[b = Int32(1)]"; + + assert_optimized_plan_eq(&plan, expected) + } + /// tests that optimizing twice yields same plan #[test] fn test_double_optimization() -> Result<()> { diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs index 59c1c5a1266d3..35a698b709ac3 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_exprs.rs @@ -833,8 +833,7 @@ mod tests { // before simplify: t.g = power(t.f, 1.0) // after simplify: (t.g = t.f) as "t.g = power(t.f, 1.0)" - let expected = - "TableScan: test, unsupported_filters=[g = f AS g = power(f,Float64(1))]"; + let expected = "TableScan: test, full_filters=[g = f AS g = power(f,Float64(1))]"; assert_optimized_plan_eq(&plan, expected) } diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 2e12a283ea6b6..468981a5fb0c8 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -27,7 +27,7 @@ use arrow::datatypes::{ use arrow::temporal_conversions::{MICROSECONDS, MILLISECONDS, NANOSECONDS}; use datafusion_common::tree_node::{RewriteRecursion, TreeNodeRewriter}; use datafusion_common::{ - internal_err, DFSchemaRef, DataFusionError, Result, ScalarValue, + internal_err, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, }; use datafusion_expr::expr::{BinaryExpr, Cast, InList, TryCast}; use datafusion_expr::expr_rewriter::rewrite_preserving_name; @@ -91,6 +91,12 @@ impl OptimizerRule for UnwrapCastInComparison { ) -> Result> { let mut schema = merge_schema(plan.inputs()); + if let LogicalPlan::TableScan(ts) = plan { + let source_schema = + DFSchema::try_from_qualified_schema(&ts.table_name, &ts.source.schema())?; + schema.merge(&source_schema); + } + schema.merge(plan.schema()); let mut expr_rewriter = UnwrapCastExprRewriter {