From 51b0f58f3e08ec44c60e753b4cbcbad3f0e7ceb6 Mon Sep 17 00:00:00 2001 From: jackwener Date: Tue, 20 Dec 2022 20:52:20 +0800 Subject: [PATCH 1/3] remove more recursion in optimizer rule --- datafusion/optimizer/src/eliminate_filter.rs | 12 +- .../optimizer/src/eliminate_outer_join.rs | 12 +- .../optimizer/src/filter_null_join_keys.rs | 14 +- datafusion/optimizer/src/push_down_filter.rs | 129 ++++++++---------- datafusion/optimizer/src/push_down_limit.rs | 96 ++++++------- .../src/rewrite_disjunctive_predicate.rs | 16 ++- .../src/single_distinct_to_groupby.rs | 70 ++++------ .../optimizer/src/subquery_filter_to_join.rs | 56 +++----- datafusion/optimizer/src/test/mod.rs | 2 +- .../src/unwrap_cast_in_comparison.rs | 18 ++- 10 files changed, 192 insertions(+), 233 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_filter.rs b/datafusion/optimizer/src/eliminate_filter.rs index e4fc803416743..c5dc1711c244a 100644 --- a/datafusion/optimizer/src/eliminate_filter.rs +++ b/datafusion/optimizer/src/eliminate_filter.rs @@ -86,7 +86,7 @@ mod tests { use crate::test::*; - fn assert_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq(Arc::new(EliminateFilter::new()), plan, expected) } @@ -102,7 +102,7 @@ mod tests { // No aggregate / scan / limit let expected = "EmptyRelation"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -124,7 +124,7 @@ mod tests { \n EmptyRelation\ \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ \n TableScan: test"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -139,7 +139,7 @@ mod tests { let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ \n TableScan: test"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -162,7 +162,7 @@ mod tests { \n TableScan: test\ \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ \n TableScan: test"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -185,6 +185,6 @@ mod tests { // Filter is removed let expected = "Projection: test.a\ \n EmptyRelation"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } } diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 8c02950d84762..590f9855529fb 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -309,7 +309,7 @@ mod tests { Operator::{And, Or}, }; - fn assert_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq(Arc::new(EliminateOuterJoin::new()), plan, expected) } @@ -333,7 +333,7 @@ mod tests { \n Left Join: t1.a = t2.a\ \n TableScan: t1\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -356,7 +356,7 @@ mod tests { \n Inner Join: t1.a = t2.a\ \n TableScan: t1\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -383,7 +383,7 @@ mod tests { \n Inner Join: t1.a = t2.a\ \n TableScan: t1\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -410,7 +410,7 @@ mod tests { \n Inner Join: t1.a = t2.a\ \n TableScan: t1\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -437,6 +437,6 @@ mod tests { \n Inner Join: t1.a = t2.a\ \n TableScan: t1\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } } diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs b/datafusion/optimizer/src/filter_null_join_keys.rs index 8a6c995dee533..8f221eaccd351 100644 --- a/datafusion/optimizer/src/filter_null_join_keys.rs +++ b/datafusion/optimizer/src/filter_null_join_keys.rs @@ -115,7 +115,7 @@ mod tests { use datafusion_expr::logical_plan::table_scan; use datafusion_expr::{col, lit, logical_plan::JoinType, LogicalPlanBuilder}; - fn assert_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq(Arc::new(FilterNullJoinKeys {}), plan, expected) } @@ -127,7 +127,7 @@ mod tests { \n Filter: t1.optional_id IS NOT NULL\ \n TableScan: t1\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -138,7 +138,7 @@ mod tests { \n Filter: t1.optional_id IS NOT NULL\ \n TableScan: t1\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -175,7 +175,7 @@ mod tests { \n Filter: t1.optional_id IS NOT NULL\ \n TableScan: t1\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -196,7 +196,7 @@ mod tests { \n Filter: t1.optional_id + UInt32(1) IS NOT NULL\ \n TableScan: t1\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -217,7 +217,7 @@ mod tests { \n TableScan: t1\ \n Filter: t2.optional_id + UInt32(1) IS NOT NULL\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -240,7 +240,7 @@ mod tests { \n TableScan: t1\ \n Filter: t2.optional_id + UInt32(1) IS NOT NULL\ \n TableScan: t2"; - assert_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } fn build_plan( diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 4ca14c278a263..4d5d7554c3214 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -28,6 +28,7 @@ use datafusion_expr::{ use std::collections::{HashMap, HashSet}; use std::iter::once; use std::sync::Arc; +use crate::optimizer::ApplyOrder; /// Push Down Filter optimizer rule pushes filter clauses down the plan /// # Introduction @@ -513,6 +514,10 @@ impl OptimizerRule for PushDownFilter { "push_down_filter" } + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + fn try_optimize( &self, plan: &LogicalPlan, @@ -523,16 +528,9 @@ impl OptimizerRule for PushDownFilter { // we also need to pushdown filter in Join. LogicalPlan::Join(join) => { let optimized_plan = push_down_join(plan, join, None)?; - return match optimized_plan { - Some(optimized_plan) => Ok(Some(utils::optimize_children( - self, - &optimized_plan, - config, - )?)), - None => Ok(Some(utils::optimize_children(self, plan, config)?)), - }; + return Ok(optimized_plan) } - _ => return Ok(Some(utils::optimize_children(self, plan, config)?)), + _ => return Ok(None), }; let child_plan = &**filter.input(); @@ -745,7 +743,7 @@ impl OptimizerRule for PushDownFilter { _ => plan.clone(), }; - Ok(Some(utils::optimize_children(self, &new_plan, config)?)) + Ok(Some(new_plan)) } } @@ -793,15 +791,8 @@ mod tests { }; use std::sync::Arc; - fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { - let optimized_plan = PushDownFilter::new() - .try_optimize(plan, &OptimizerContext::new()) - .unwrap() - .expect("failed to optimize plan"); - let formatted_plan = format!("{:?}", optimized_plan); - assert_eq!(plan.schema(), optimized_plan.schema()); - assert_eq!(expected, formatted_plan); - Ok(()) + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq(Arc::new(PushDownFilter::new()), plan, expected) } #[test] @@ -816,7 +807,7 @@ mod tests { Projection: test.a, test.b\ \n Filter: test.a = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -833,7 +824,7 @@ mod tests { \n Limit: skip=0, fetch=10\ \n Projection: test.a, test.b\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -845,7 +836,7 @@ mod tests { let expected = "\ Filter: Int64(0) = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -862,7 +853,7 @@ mod tests { \n Projection: test.a, test.b, test.c\ \n Filter: test.a = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -877,7 +868,7 @@ mod tests { Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b) AS total_salary]]\ \n Filter: test.a > Int64(10)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -890,7 +881,7 @@ mod tests { let expected = "Filter: test.b > Int64(10)\ \n Aggregate: groupBy=[[test.b + test.a]], aggr=[[SUM(test.a), test.b]]\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -903,7 +894,7 @@ mod tests { "Aggregate: groupBy=[[test.b + test.a]], aggr=[[SUM(test.a), test.b]]\ \n Filter: test.b + test.a > Int64(10)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -918,7 +909,7 @@ mod tests { Filter: b > Int64(10)\ \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b) AS b]]\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written @@ -934,7 +925,7 @@ mod tests { Projection: test.a AS b, test.c\ \n Filter: test.a = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } fn add(left: Expr, right: Expr) -> Expr { @@ -979,7 +970,7 @@ mod tests { Projection: test.a * Int32(2) + test.c AS b, test.c\ \n Filter: test.a * Int32(2) + test.c = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// verifies that when a filter is pushed to after 2 projections, the filter expression is correctly re-written @@ -1012,7 +1003,7 @@ mod tests { \n Projection: test.a * Int32(2) + test.c AS b, test.c\ \n Filter: (test.a * Int32(2) + test.c) * Int32(3) = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// verifies that when two filters apply after an aggregation that only allows one to be pushed, one is pushed @@ -1046,7 +1037,7 @@ mod tests { \n Projection: test.a AS b, test.c\ \n Filter: test.a > Int64(10)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// verifies that when a filter with two predicates is applied after an aggregation that only allows one to be pushed, one is pushed @@ -1081,7 +1072,7 @@ mod tests { \n Projection: test.a AS b, test.c\ \n Filter: test.a > Int64(10)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// verifies that when two limits are in place, we jump neither @@ -1103,7 +1094,7 @@ mod tests { \n Limit: skip=0, fetch=20\ \n Projection: test.a, test.b\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -1120,7 +1111,7 @@ mod tests { \n TableScan: test\ \n Filter: test2.a = Int64(1)\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -1145,7 +1136,7 @@ mod tests { \n Projection: test.a AS b\ \n Filter: test.a = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -1178,7 +1169,7 @@ mod tests { \n Filter: test1.d > Int32(2)\ \n TableScan: test1"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -1206,7 +1197,7 @@ mod tests { \n Projection: test1.a, test1.b, test1.c\ \n Filter: test1.a > Int32(2)\ \n TableScan: test1"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// verifies that filters with the same columns are correctly placed @@ -1241,7 +1232,7 @@ mod tests { \n Filter: test.a <= Int64(1)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// verifies that filters to be placed on the same depth are ANDed @@ -1271,7 +1262,7 @@ mod tests { \n Limit: skip=0, fetch=1\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// verifies that filters on a plan with user nodes are not lost @@ -1293,7 +1284,7 @@ mod tests { // not part of the test assert_eq!(format!("{:?}", plan), expected); - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// post-on-join predicates on a column common to both sides is pushed to both sides @@ -1333,7 +1324,7 @@ mod tests { \n Projection: test2.a\ \n Filter: test2.a <= Int64(1)\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// post-using-join predicates on a column common to both sides is pushed to both sides @@ -1372,7 +1363,7 @@ mod tests { \n Projection: test2.a\ \n Filter: test2.a <= Int64(1)\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// post-join predicates with columns from both sides are not pushed @@ -1410,7 +1401,7 @@ mod tests { // expected is equal: no push-down let expected = &format!("{:?}", plan); - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// post-join predicates with columns from one side of a join are pushed only to that side @@ -1453,7 +1444,7 @@ mod tests { \n TableScan: test\ \n Projection: test2.a, test2.c\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// post-join predicates on the right side of a left join are not duplicated @@ -1492,7 +1483,7 @@ mod tests { \n TableScan: test\ \n Projection: test2.a\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// post-join predicates on the left side of a right join are not duplicated @@ -1530,7 +1521,7 @@ mod tests { \n TableScan: test\ \n Projection: test2.a\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// post-left-join predicate on a column common to both sides is only pushed to the left side @@ -1569,7 +1560,7 @@ mod tests { \n TableScan: test\ \n Projection: test2.a\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// post-right-join predicate on a column common to both sides is only pushed to the right side @@ -1608,7 +1599,7 @@ mod tests { \n Projection: test2.a\ \n Filter: test2.a <= Int64(1)\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// single table predicate parts of ON condition should be pushed to both inputs @@ -1653,7 +1644,7 @@ mod tests { \n Projection: test2.a, test2.b, test2.c\ \n Filter: test2.c > UInt32(4)\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// join filter should be completely removed after pushdown @@ -1697,7 +1688,7 @@ mod tests { \n Projection: test2.a, test2.b, test2.c\ \n Filter: test2.c > UInt32(4)\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// predicate on join key in filter expression should be pushed down to both inputs @@ -1739,7 +1730,7 @@ mod tests { \n Projection: test2.b\ \n Filter: test2.b > UInt32(1)\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// single table predicate parts of ON condition should be pushed to right input @@ -1783,7 +1774,7 @@ mod tests { \n Projection: test2.a, test2.b, test2.c\ \n Filter: test2.c > UInt32(4)\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// single table predicate parts of ON condition should be pushed to left input @@ -1827,7 +1818,7 @@ mod tests { \n TableScan: test\ \n Projection: test2.a, test2.b, test2.c\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// single table predicate parts of ON condition should not be pushed @@ -1865,7 +1856,7 @@ mod tests { ); let expected = &format!("{:?}", plan); - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } struct PushDownProvider { @@ -1924,7 +1915,7 @@ mod tests { let expected = "\ TableScan: test, full_filters=[a = Int64(1)]"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -1935,7 +1926,7 @@ mod tests { let expected = "\ Filter: a = Int64(1)\ \n TableScan: test, partial_filters=[a = Int64(1)]"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -1954,7 +1945,7 @@ mod tests { // Optimizing the same plan multiple times should produce the same plan // each time. - assert_optimized_plan_eq(&optimised_plan, expected) + assert_optimized_plan_equal(&optimised_plan, expected) } #[test] @@ -1965,7 +1956,7 @@ mod tests { let expected = "\ Filter: a = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -1994,7 +1985,7 @@ mod tests { \n Filter: a = Int64(10) AND b > Int64(11)\ \n TableScan: test projection=[a], partial_filters=[a = Int64(10), b > Int64(11)]"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -2023,7 +2014,7 @@ mod tests { \n TableScan: test\ "; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -2056,7 +2047,7 @@ mod tests { \n TableScan: test\ "; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -2083,7 +2074,7 @@ mod tests { \n TableScan: test\ "; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } /// predicate on join key in filter expression should be pushed down to both inputs @@ -2125,7 +2116,7 @@ mod tests { \n Projection: test2.b AS d\ \n Filter: test2.b > UInt32(1)\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -2156,7 +2147,7 @@ mod tests { \n TableScan: test\ "; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -2190,7 +2181,7 @@ mod tests { \n TableScan: test\ "; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -2227,7 +2218,7 @@ mod tests { \n Projection: sq.c\ \n TableScan: sq\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected_after) + assert_optimized_plan_equal(&plan, expected_after) } #[test] @@ -2260,7 +2251,7 @@ mod tests { \n Projection: Int64(0) AS a\ \n Filter: Int64(0) = Int64(1)\ \n EmptyRelation"; - assert_optimized_plan_eq(&plan, expected_after) + assert_optimized_plan_equal(&plan, expected_after) } #[test] @@ -2291,7 +2282,7 @@ mod tests { \n TableScan: test\ \n Projection: test1.a AS d, test1.a AS e\ \n TableScan: test1"; - assert_optimized_plan_eq(&plan, expected)?; + assert_optimized_plan_equal(&plan, expected)?; // Originally global state which can help to avoid duplicate Filters been generated and pushed down. // Now the global state is removed. Need to double confirm that avoid duplicate Filters. @@ -2299,6 +2290,6 @@ mod tests { .try_optimize(&plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); - assert_optimized_plan_eq(&optimized_plan, expected) + assert_optimized_plan_equal(&optimized_plan, expected) } } diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index ad5ceaea0b848..4070ef120b3e1 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -17,7 +17,8 @@ //! Optimizer rule to push down LIMIT in the query plan //! It will push down through projection, limits (taking the smaller limit) -use crate::{utils, OptimizerConfig, OptimizerRule}; +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::{ logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union}, @@ -82,7 +83,7 @@ impl OptimizerRule for PushDownLimit { ) -> Result> { let limit = match plan { LogicalPlan::Limit(limit) => limit, - _ => return Ok(Some(utils::optimize_children(self, plan, config)?)), + _ => return Ok(None), }; if let LogicalPlan::Limit(child_limit) = &*limit.input { @@ -117,7 +118,7 @@ impl OptimizerRule for PushDownLimit { let fetch = match limit.fetch { Some(fetch) => fetch, - None => return Ok(Some(utils::optimize_children(self, plan, config)?)), + None => return Ok(None), }; let skip = limit.skip; @@ -225,12 +226,16 @@ impl OptimizerRule for PushDownLimit { _ => plan.clone(), }; - Ok(Some(utils::optimize_children(self, &plan, config)?)) + Ok(Some(plan)) } fn name(&self) -> &str { "push_down_limit" } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } } fn fetch_minus_skip(fetch: usize, skip: usize) -> usize { @@ -247,25 +252,14 @@ mod test { use super::*; use crate::test::*; - use crate::OptimizerContext; use datafusion_expr::{ col, exists, logical_plan::{builder::LogicalPlanBuilder, JoinType, LogicalPlan}, max, }; - fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { - let optimized_plan = PushDownLimit::new() - .try_optimize(plan, &OptimizerContext::new()) - .unwrap() - .expect("failed to optimize plan"); - - let formatted_plan = format!("{:?}", optimized_plan); - - assert_eq!(formatted_plan, expected); - assert_eq!(optimized_plan.schema(), plan.schema()); - - Ok(()) + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq(Arc::new(PushDownLimit::new()), plan, expected) } #[test] @@ -283,7 +277,7 @@ mod test { \n Limit: skip=0, fetch=1000\ \n TableScan: test, fetch=1000"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -301,7 +295,7 @@ mod test { let expected = "Limit: skip=0, fetch=10\ \n TableScan: test, fetch=10"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -318,7 +312,7 @@ mod test { \n Aggregate: groupBy=[[test.a]], aggr=[[MAX(test.b)]]\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -338,7 +332,7 @@ mod test { \n Limit: skip=0, fetch=1000\ \n TableScan: test, fetch=1000"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -355,7 +349,7 @@ mod test { \n Sort: test.a, fetch=10\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -372,7 +366,7 @@ mod test { \n Sort: test.a, fetch=15\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -391,7 +385,7 @@ mod test { \n Limit: skip=0, fetch=1000\ \n TableScan: test, fetch=1000"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -406,7 +400,7 @@ mod test { let expected = "Limit: skip=10, fetch=None\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -424,7 +418,7 @@ mod test { \n Limit: skip=10, fetch=1000\ \n TableScan: test, fetch=1010"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -441,7 +435,7 @@ mod test { \n Limit: skip=10, fetch=990\ \n TableScan: test, fetch=1000"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -458,7 +452,7 @@ mod test { \n Limit: skip=10, fetch=1000\ \n TableScan: test, fetch=1010"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -474,7 +468,7 @@ mod test { let expected = "Limit: skip=10, fetch=10\ \n TableScan: test, fetch=20"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -491,7 +485,7 @@ mod test { \n Aggregate: groupBy=[[test.a]], aggr=[[MAX(test.b)]]\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -511,7 +505,7 @@ mod test { \n Limit: skip=0, fetch=1010\ \n TableScan: test, fetch=1010"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -535,7 +529,7 @@ mod test { \n TableScan: test\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -559,7 +553,7 @@ mod test { \n TableScan: test\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -588,7 +582,7 @@ mod test { \n Projection: test2.a\ \n TableScan: test2"; - assert_optimized_plan_eq(&outer_query, expected) + assert_optimized_plan_equal(&outer_query, expected) } #[test] @@ -617,7 +611,7 @@ mod test { \n Projection: test2.a\ \n TableScan: test2"; - assert_optimized_plan_eq(&outer_query, expected) + assert_optimized_plan_equal(&outer_query, expected) } #[test] @@ -643,7 +637,7 @@ mod test { \n Limit: skip=0, fetch=1000\ \n TableScan: test2, fetch=1000"; - assert_optimized_plan_eq(&plan, expected)?; + assert_optimized_plan_equal(&plan, expected)?; let plan = LogicalPlanBuilder::from(table_scan_1.clone()) .join( @@ -662,7 +656,7 @@ mod test { \n Limit: skip=0, fetch=1000\ \n TableScan: test2, fetch=1000"; - assert_optimized_plan_eq(&plan, expected)?; + assert_optimized_plan_equal(&plan, expected)?; let plan = LogicalPlanBuilder::from(table_scan_1.clone()) .join( @@ -681,7 +675,7 @@ mod test { \n Limit: skip=0, fetch=1000\ \n TableScan: test2, fetch=1000"; - assert_optimized_plan_eq(&plan, expected)?; + assert_optimized_plan_equal(&plan, expected)?; let plan = LogicalPlanBuilder::from(table_scan_1.clone()) .join( @@ -699,7 +693,7 @@ mod test { \n TableScan: test, fetch=1000\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected)?; + assert_optimized_plan_equal(&plan, expected)?; let plan = LogicalPlanBuilder::from(table_scan_1.clone()) .join( @@ -717,7 +711,7 @@ mod test { \n TableScan: test, fetch=1000\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected)?; + assert_optimized_plan_equal(&plan, expected)?; let plan = LogicalPlanBuilder::from(table_scan_1.clone()) .join( @@ -735,7 +729,7 @@ mod test { \n Limit: skip=0, fetch=1000\ \n TableScan: test2, fetch=1000"; - assert_optimized_plan_eq(&plan, expected)?; + assert_optimized_plan_equal(&plan, expected)?; let plan = LogicalPlanBuilder::from(table_scan_1) .join( @@ -753,7 +747,7 @@ mod test { \n Limit: skip=0, fetch=1000\ \n TableScan: test2, fetch=1000"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -778,7 +772,7 @@ mod test { \n TableScan: test, fetch=1000\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -803,7 +797,7 @@ mod test { \n TableScan: test, fetch=1010\ \n TableScan: test2"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -828,7 +822,7 @@ mod test { \n Limit: skip=0, fetch=1000\ \n TableScan: test2, fetch=1000"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -853,7 +847,7 @@ mod test { \n Limit: skip=0, fetch=1010\ \n TableScan: test2, fetch=1010"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -873,7 +867,7 @@ mod test { \n Limit: skip=0, fetch=1000\ \n TableScan: test2, fetch=1000"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -893,7 +887,7 @@ mod test { \n Limit: skip=0, fetch=2000\ \n TableScan: test2, fetch=2000"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -908,7 +902,7 @@ mod test { let expected = "Limit: skip=1000, fetch=0\ \n TableScan: test, fetch=0"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -923,7 +917,7 @@ mod test { let expected = "Limit: skip=1000, fetch=0\ \n TableScan: test, fetch=0"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -940,6 +934,6 @@ mod test { \n Limit: skip=1000, fetch=0\ \n TableScan: test, fetch=0"; - assert_optimized_plan_eq(&plan, expected) + assert_optimized_plan_equal(&plan, expected) } } diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs index 0f9ba3d371e12..064918b1a71f7 100644 --- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs +++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -use crate::{utils, OptimizerConfig, OptimizerRule}; +use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::expr::BinaryExpr; use datafusion_expr::logical_plan::Filter; use datafusion_expr::{Expr, LogicalPlan, Operator}; -use std::sync::Arc; +use crate::optimizer::ApplyOrder; /// Optimizer pass that rewrites predicates of the form /// @@ -127,7 +127,7 @@ impl OptimizerRule for RewriteDisjunctivePredicate { fn try_optimize( &self, plan: &LogicalPlan, - config: &dyn OptimizerConfig, + _config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Filter(filter) => { @@ -136,18 +136,20 @@ impl OptimizerRule for RewriteDisjunctivePredicate { let rewritten_expr = normalize_predicate(rewritten_predicate); Ok(Some(LogicalPlan::Filter(Filter::try_new( rewritten_expr, - self.try_optimize(filter.input(), config)? - .map(Arc::new) - .unwrap_or_else(|| filter.input().clone()), + filter.input.clone() )?))) } - _ => Ok(Some(utils::optimize_children(self, plan, config)?)), + _ => Ok(None), } } fn name(&self) -> &str { "rewrite_disjunctive_predicate" } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } } #[derive(Clone, PartialEq, Debug)] diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index bf4231c1f0cdc..74ad70e1d6b80 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -17,7 +17,8 @@ //! single distinct to group by optimizer rule -use crate::{utils, OptimizerConfig, OptimizerRule}; +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::{DFSchema, Result}; use datafusion_expr::{ col, @@ -90,7 +91,7 @@ impl OptimizerRule for SingleDistinctToGroupBy { fn try_optimize( &self, plan: &LogicalPlan, - config: &dyn OptimizerConfig, + _config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Aggregate(Aggregate { @@ -157,13 +158,11 @@ impl OptimizerRule for SingleDistinctToGroupBy { inner_fields, input.schema().metadata().clone(), )?; - let grouped_aggr = LogicalPlan::Aggregate(Aggregate::try_new( + let inner_agg = LogicalPlan::Aggregate(Aggregate::try_new( input.clone(), inner_group_exprs, Vec::new(), )?); - let inner_agg = - utils::optimize_children(self, &grouped_aggr, config)?; let outer_aggr_schema = Arc::new(DFSchema::new_with_metadata( outer_group_exprs @@ -207,15 +206,20 @@ impl OptimizerRule for SingleDistinctToGroupBy { )?, ))) } else { - Ok(Some(utils::optimize_children(self, plan, config)?)) + Ok(None) } } - _ => Ok(Some(utils::optimize_children(self, plan, config)?)), + _ => Ok(None), } } + fn name(&self) -> &str { "single_distinct_aggregation_to_group_by" } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } } #[cfg(test)] @@ -230,15 +234,9 @@ mod tests { AggregateFunction, }; - fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { - let rule = SingleDistinctToGroupBy::new(); - let optimized_plan = rule - .try_optimize(plan, &OptimizerContext::new()) - .unwrap() - .expect("failed to optimize plan"); - - let formatted_plan = format!("{}", optimized_plan.display_indent_schema()); - assert_eq!(formatted_plan, expected); + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq_display_indent(Arc::new(SingleDistinctToGroupBy::new()), plan, expected); + Ok(()) } #[test] @@ -254,8 +252,7 @@ mod tests { "Aggregate: groupBy=[[]], aggr=[[MAX(test.b)]] [MAX(test.b):UInt32;N]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -272,8 +269,7 @@ mod tests { \n Aggregate: groupBy=[[test.b AS alias1]], aggr=[[]] [alias1:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } // Currently this optimization is disabled for CUBE/ROLLUP/GROUPING SET @@ -294,8 +290,7 @@ mod tests { let expected = "Aggregate: groupBy=[[GROUPING SETS ((test.a), (test.b))]], aggr=[[COUNT(DISTINCT test.c)]] [a:UInt32, b:UInt32, COUNT(DISTINCT test.c):Int64;N]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } // Currently this optimization is disabled for CUBE/ROLLUP/GROUPING SET @@ -313,8 +308,7 @@ mod tests { let expected = "Aggregate: groupBy=[[CUBE (test.a, test.b)]], aggr=[[COUNT(DISTINCT test.c)]] [a:UInt32, b:UInt32, COUNT(DISTINCT test.c):Int64;N]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } // Currently this optimization is disabled for CUBE/ROLLUP/GROUPING SET @@ -333,8 +327,7 @@ mod tests { let expected = "Aggregate: groupBy=[[ROLLUP (test.a, test.b)]], aggr=[[COUNT(DISTINCT test.c)]] [a:UInt32, b:UInt32, COUNT(DISTINCT test.c):Int64;N]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -350,8 +343,7 @@ mod tests { \n Aggregate: groupBy=[[Int32(2) * test.b AS alias1]], aggr=[[]] [alias1:Int32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -368,8 +360,7 @@ mod tests { \n Aggregate: groupBy=[[test.a AS group_alias_0, test.b AS alias1]], aggr=[[]] [group_alias_0:UInt32, alias1:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -387,8 +378,7 @@ mod tests { let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(DISTINCT test.b), COUNT(DISTINCT test.c)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N, COUNT(DISTINCT test.c):Int64;N]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -415,8 +405,7 @@ mod tests { \n Aggregate: groupBy=[[test.a AS group_alias_0, test.b AS alias1]], aggr=[[]] [group_alias_0:UInt32, alias1:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -434,19 +423,16 @@ mod tests { let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(DISTINCT test.b), COUNT(test.c)]] [a:UInt32, COUNT(DISTINCT test.b):Int64;N, COUNT(test.c):Int64;N]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } #[test] - fn group_by_with_expr() { + fn group_by_with_expr() -> Result<()> { let table_scan = test_table_scan().unwrap(); let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a") + lit(1)], vec![count_distinct(col("c"))]) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a") + lit(1)], vec![count_distinct(col("c"))])? + .build()?; // Should work let expected = "Projection: group_alias_0 AS test.a + Int32(1), COUNT(alias1) AS COUNT(DISTINCT test.c) [test.a + Int32(1):Int32, COUNT(DISTINCT test.c):Int64;N]\ @@ -454,6 +440,6 @@ mod tests { \n Aggregate: groupBy=[[test.a + Int32(1) AS group_alias_0, test.c AS alias1]], aggr=[[]] [group_alias_0:Int32, alias1:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_equal(&plan, expected) } } diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs index da79869566d20..5ee0a0d6703fb 100644 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ b/datafusion/optimizer/src/subquery_filter_to_join.rs @@ -36,6 +36,7 @@ use datafusion_expr::{ Expr, }; use std::sync::Arc; +use crate::optimizer::ApplyOrder; /// Optimizer rule for rewriting subquery filters to joins #[derive(Default)] @@ -57,9 +58,7 @@ impl OptimizerRule for SubqueryFilterToJoin { match plan { LogicalPlan::Filter(filter) => { // Apply optimizer rule to current input - let optimized_input = self - .try_optimize(filter.input(), config)? - .unwrap_or_else(|| filter.input().as_ref().clone()); + let input = filter.input().as_ref().clone(); // Splitting filter expression into components by AND let filters = utils::split_conjunction(filter.predicate()); @@ -82,14 +81,14 @@ impl OptimizerRule for SubqueryFilterToJoin { if !subqueries_in_regular.is_empty() { return Ok(Some(LogicalPlan::Filter(Filter::try_new( filter.predicate().clone(), - Arc::new(optimized_input), + Arc::new(input), )?))); }; // Add subquery joins to new_input // optimized_input value should retain for possible optimization rollback let opt_result = subquery_filters.iter().try_fold( - optimized_input.clone(), + input.clone(), |input, &e| match e { Expr::InSubquery { expr, @@ -124,7 +123,7 @@ impl OptimizerRule for SubqueryFilterToJoin { }; let schema = build_join_schema( - optimized_input.schema(), + input.schema(), right_schema, &join_type, )?; @@ -154,7 +153,7 @@ impl OptimizerRule for SubqueryFilterToJoin { Err(_) => { return Ok(Some(LogicalPlan::Filter(Filter::try_new( filter.predicate().clone(), - Arc::new(optimized_input), + Arc::new(input), )?))) } }; @@ -167,8 +166,7 @@ impl OptimizerRule for SubqueryFilterToJoin { } } _ => { - // Apply the optimization to all inputs of the plan - Ok(Some(utils::optimize_children(self, plan, config)?)) + Ok(None) } } } @@ -176,6 +174,10 @@ impl OptimizerRule for SubqueryFilterToJoin { fn name(&self) -> &str { "subquery_filter_to_join" } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } } fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec) -> Result<()> { @@ -200,20 +202,14 @@ fn extract_subquery_filters(expression: &Expr, extracted: &mut Vec) -> Res mod tests { use super::*; use crate::test::*; - use crate::OptimizerContext; use datafusion_expr::{ and, binary_expr, col, in_subquery, lit, logical_plan::LogicalPlanBuilder, not_in_subquery, or, Operator, }; - fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { - let rule = SubqueryFilterToJoin::new(); - let optimized_plan = rule - .try_optimize(plan, &OptimizerContext::new()) - .unwrap() - .expect("failed to optimize plan"); - let formatted_plan = format!("{}", optimized_plan.display_indent_schema()); - assert_eq!(formatted_plan, expected); + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq_display_indent(Arc::new(SubqueryFilterToJoin::new()), plan, expected); + Ok(()) } fn test_subquery_with_name(name: &str) -> Result> { @@ -240,8 +236,7 @@ mod tests { \n Projection: sq.c [c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } /// Test for single NOT IN subquery filter @@ -259,8 +254,7 @@ mod tests { \n Projection: sq.c [c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } /// Test for several IN subquery expressions @@ -284,8 +278,7 @@ mod tests { \n Projection: sq_2.c [c:UInt32]\ \n TableScan: sq_2 [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } /// Test for IN subquery with additional AND filter @@ -310,8 +303,7 @@ mod tests { \n Projection: sq.c [c:UInt32]\ \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } /// Test for IN subquery with additional OR filter @@ -337,8 +329,7 @@ mod tests { \n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } #[test] @@ -365,8 +356,7 @@ mod tests { \n TableScan: sq2 [a:UInt32, b:UInt32, c:UInt32]\ \n TableScan: test [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } /// Test for nested IN subqueries @@ -393,8 +383,7 @@ mod tests { \n Projection: sq_nested.c [c:UInt32]\ \n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } /// Test for filter input modification in case filter not supported @@ -425,7 +414,6 @@ mod tests { \n Projection: sq_inner.c [c:UInt32]\ \n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]"; - assert_optimized_plan_eq(&plan, expected); - Ok(()) + assert_optimized_plan_equal(&plan, expected) } } diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index a51c2ec29fe78..7532a9d1a3785 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -114,7 +114,7 @@ pub fn assert_optimized_plan_eq( plan, &OptimizerContext::new(), )? - .unwrap(); + .unwrap_or_else(|| plan.clone()); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index 9f6d1a5ac2ba9..9c4887699ea7a 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -18,6 +18,7 @@ //! Unwrap-cast binary comparison rule can be used to the binary/inlist comparison expr now, and other type //! of expr can be added if needed. //! This rule can reduce adding the `Expr::Cast` the expr instead of adding the `Expr::Cast` to literal expr. +use crate::optimizer::ApplyOrder; use crate::utils::rewrite_preserving_name; use crate::{OptimizerConfig, OptimizerRule}; use arrow::datatypes::{ @@ -84,16 +85,9 @@ impl OptimizerRule for UnwrapCastInComparison { plan: &LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { - let new_inputs = plan - .inputs() - .into_iter() - .map(|input| { - self.try_optimize(input, _config) - .map(|o| o.unwrap_or_else(|| input.clone())) - }) - .collect::>>()?; + let inputs: Vec = plan.inputs().into_iter().cloned().collect(); - let mut schema = new_inputs.iter().map(|input| input.schema()).fold( + let mut schema = inputs.iter().map(|input| input.schema()).fold( DFSchema::empty(), |mut lhs, rhs| { lhs.merge(rhs); @@ -116,13 +110,17 @@ impl OptimizerRule for UnwrapCastInComparison { Ok(Some(from_plan( plan, new_exprs.as_slice(), - new_inputs.as_slice(), + inputs.as_slice(), )?)) } fn name(&self) -> &str { "unwrap_cast_in_comparison" } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } } struct UnwrapCastExprRewriter { From 9130c54aac74a94493b127beb2cf4ccad9b3ab2e Mon Sep 17 00:00:00 2001 From: jackwener Date: Tue, 20 Dec 2022 20:54:13 +0800 Subject: [PATCH 2/3] revert change push_down_filter --- datafusion/optimizer/src/push_down_filter.rs | 129 ++++++++++-------- .../src/single_distinct_to_groupby.rs | 1 - 2 files changed, 69 insertions(+), 61 deletions(-) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 4d5d7554c3214..4ca14c278a263 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -28,7 +28,6 @@ use datafusion_expr::{ use std::collections::{HashMap, HashSet}; use std::iter::once; use std::sync::Arc; -use crate::optimizer::ApplyOrder; /// Push Down Filter optimizer rule pushes filter clauses down the plan /// # Introduction @@ -514,10 +513,6 @@ impl OptimizerRule for PushDownFilter { "push_down_filter" } - fn apply_order(&self) -> Option { - Some(ApplyOrder::TopDown) - } - fn try_optimize( &self, plan: &LogicalPlan, @@ -528,9 +523,16 @@ impl OptimizerRule for PushDownFilter { // we also need to pushdown filter in Join. LogicalPlan::Join(join) => { let optimized_plan = push_down_join(plan, join, None)?; - return Ok(optimized_plan) + return match optimized_plan { + Some(optimized_plan) => Ok(Some(utils::optimize_children( + self, + &optimized_plan, + config, + )?)), + None => Ok(Some(utils::optimize_children(self, plan, config)?)), + }; } - _ => return Ok(None), + _ => return Ok(Some(utils::optimize_children(self, plan, config)?)), }; let child_plan = &**filter.input(); @@ -743,7 +745,7 @@ impl OptimizerRule for PushDownFilter { _ => plan.clone(), }; - Ok(Some(new_plan)) + Ok(Some(utils::optimize_children(self, &new_plan, config)?)) } } @@ -791,8 +793,15 @@ mod tests { }; use std::sync::Arc; - fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { - assert_optimized_plan_eq(Arc::new(PushDownFilter::new()), plan, expected) + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { + let optimized_plan = PushDownFilter::new() + .try_optimize(plan, &OptimizerContext::new()) + .unwrap() + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); + assert_eq!(plan.schema(), optimized_plan.schema()); + assert_eq!(expected, formatted_plan); + Ok(()) } #[test] @@ -807,7 +816,7 @@ mod tests { Projection: test.a, test.b\ \n Filter: test.a = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -824,7 +833,7 @@ mod tests { \n Limit: skip=0, fetch=10\ \n Projection: test.a, test.b\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -836,7 +845,7 @@ mod tests { let expected = "\ Filter: Int64(0) = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -853,7 +862,7 @@ mod tests { \n Projection: test.a, test.b, test.c\ \n Filter: test.a = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -868,7 +877,7 @@ mod tests { Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b) AS total_salary]]\ \n Filter: test.a > Int64(10)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -881,7 +890,7 @@ mod tests { let expected = "Filter: test.b > Int64(10)\ \n Aggregate: groupBy=[[test.b + test.a]], aggr=[[SUM(test.a), test.b]]\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -894,7 +903,7 @@ mod tests { "Aggregate: groupBy=[[test.b + test.a]], aggr=[[SUM(test.a), test.b]]\ \n Filter: test.b + test.a > Int64(10)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -909,7 +918,7 @@ mod tests { Filter: b > Int64(10)\ \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b) AS b]]\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// verifies that a filter is pushed to before a projection, the filter expression is correctly re-written @@ -925,7 +934,7 @@ mod tests { Projection: test.a AS b, test.c\ \n Filter: test.a = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } fn add(left: Expr, right: Expr) -> Expr { @@ -970,7 +979,7 @@ mod tests { Projection: test.a * Int32(2) + test.c AS b, test.c\ \n Filter: test.a * Int32(2) + test.c = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// verifies that when a filter is pushed to after 2 projections, the filter expression is correctly re-written @@ -1003,7 +1012,7 @@ mod tests { \n Projection: test.a * Int32(2) + test.c AS b, test.c\ \n Filter: (test.a * Int32(2) + test.c) * Int32(3) = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// verifies that when two filters apply after an aggregation that only allows one to be pushed, one is pushed @@ -1037,7 +1046,7 @@ mod tests { \n Projection: test.a AS b, test.c\ \n Filter: test.a > Int64(10)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// verifies that when a filter with two predicates is applied after an aggregation that only allows one to be pushed, one is pushed @@ -1072,7 +1081,7 @@ mod tests { \n Projection: test.a AS b, test.c\ \n Filter: test.a > Int64(10)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// verifies that when two limits are in place, we jump neither @@ -1094,7 +1103,7 @@ mod tests { \n Limit: skip=0, fetch=20\ \n Projection: test.a, test.b\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -1111,7 +1120,7 @@ mod tests { \n TableScan: test\ \n Filter: test2.a = Int64(1)\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -1136,7 +1145,7 @@ mod tests { \n Projection: test.a AS b\ \n Filter: test.a = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -1169,7 +1178,7 @@ mod tests { \n Filter: test1.d > Int32(2)\ \n TableScan: test1"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -1197,7 +1206,7 @@ mod tests { \n Projection: test1.a, test1.b, test1.c\ \n Filter: test1.a > Int32(2)\ \n TableScan: test1"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// verifies that filters with the same columns are correctly placed @@ -1232,7 +1241,7 @@ mod tests { \n Filter: test.a <= Int64(1)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// verifies that filters to be placed on the same depth are ANDed @@ -1262,7 +1271,7 @@ mod tests { \n Limit: skip=0, fetch=1\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// verifies that filters on a plan with user nodes are not lost @@ -1284,7 +1293,7 @@ mod tests { // not part of the test assert_eq!(format!("{:?}", plan), expected); - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// post-on-join predicates on a column common to both sides is pushed to both sides @@ -1324,7 +1333,7 @@ mod tests { \n Projection: test2.a\ \n Filter: test2.a <= Int64(1)\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// post-using-join predicates on a column common to both sides is pushed to both sides @@ -1363,7 +1372,7 @@ mod tests { \n Projection: test2.a\ \n Filter: test2.a <= Int64(1)\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// post-join predicates with columns from both sides are not pushed @@ -1401,7 +1410,7 @@ mod tests { // expected is equal: no push-down let expected = &format!("{:?}", plan); - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// post-join predicates with columns from one side of a join are pushed only to that side @@ -1444,7 +1453,7 @@ mod tests { \n TableScan: test\ \n Projection: test2.a, test2.c\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// post-join predicates on the right side of a left join are not duplicated @@ -1483,7 +1492,7 @@ mod tests { \n TableScan: test\ \n Projection: test2.a\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// post-join predicates on the left side of a right join are not duplicated @@ -1521,7 +1530,7 @@ mod tests { \n TableScan: test\ \n Projection: test2.a\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// post-left-join predicate on a column common to both sides is only pushed to the left side @@ -1560,7 +1569,7 @@ mod tests { \n TableScan: test\ \n Projection: test2.a\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// post-right-join predicate on a column common to both sides is only pushed to the right side @@ -1599,7 +1608,7 @@ mod tests { \n Projection: test2.a\ \n Filter: test2.a <= Int64(1)\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// single table predicate parts of ON condition should be pushed to both inputs @@ -1644,7 +1653,7 @@ mod tests { \n Projection: test2.a, test2.b, test2.c\ \n Filter: test2.c > UInt32(4)\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// join filter should be completely removed after pushdown @@ -1688,7 +1697,7 @@ mod tests { \n Projection: test2.a, test2.b, test2.c\ \n Filter: test2.c > UInt32(4)\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// predicate on join key in filter expression should be pushed down to both inputs @@ -1730,7 +1739,7 @@ mod tests { \n Projection: test2.b\ \n Filter: test2.b > UInt32(1)\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// single table predicate parts of ON condition should be pushed to right input @@ -1774,7 +1783,7 @@ mod tests { \n Projection: test2.a, test2.b, test2.c\ \n Filter: test2.c > UInt32(4)\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// single table predicate parts of ON condition should be pushed to left input @@ -1818,7 +1827,7 @@ mod tests { \n TableScan: test\ \n Projection: test2.a, test2.b, test2.c\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// single table predicate parts of ON condition should not be pushed @@ -1856,7 +1865,7 @@ mod tests { ); let expected = &format!("{:?}", plan); - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } struct PushDownProvider { @@ -1915,7 +1924,7 @@ mod tests { let expected = "\ TableScan: test, full_filters=[a = Int64(1)]"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -1926,7 +1935,7 @@ mod tests { let expected = "\ Filter: a = Int64(1)\ \n TableScan: test, partial_filters=[a = Int64(1)]"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -1945,7 +1954,7 @@ mod tests { // Optimizing the same plan multiple times should produce the same plan // each time. - assert_optimized_plan_equal(&optimised_plan, expected) + assert_optimized_plan_eq(&optimised_plan, expected) } #[test] @@ -1956,7 +1965,7 @@ mod tests { let expected = "\ Filter: a = Int64(1)\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -1985,7 +1994,7 @@ mod tests { \n Filter: a = Int64(10) AND b > Int64(11)\ \n TableScan: test projection=[a], partial_filters=[a = Int64(10), b > Int64(11)]"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -2014,7 +2023,7 @@ mod tests { \n TableScan: test\ "; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -2047,7 +2056,7 @@ mod tests { \n TableScan: test\ "; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -2074,7 +2083,7 @@ mod tests { \n TableScan: test\ "; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } /// predicate on join key in filter expression should be pushed down to both inputs @@ -2116,7 +2125,7 @@ mod tests { \n Projection: test2.b AS d\ \n Filter: test2.b > UInt32(1)\ \n TableScan: test2"; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -2147,7 +2156,7 @@ mod tests { \n TableScan: test\ "; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -2181,7 +2190,7 @@ mod tests { \n TableScan: test\ "; - assert_optimized_plan_equal(&plan, expected) + assert_optimized_plan_eq(&plan, expected) } #[test] @@ -2218,7 +2227,7 @@ mod tests { \n Projection: sq.c\ \n TableScan: sq\ \n TableScan: test"; - assert_optimized_plan_equal(&plan, expected_after) + assert_optimized_plan_eq(&plan, expected_after) } #[test] @@ -2251,7 +2260,7 @@ mod tests { \n Projection: Int64(0) AS a\ \n Filter: Int64(0) = Int64(1)\ \n EmptyRelation"; - assert_optimized_plan_equal(&plan, expected_after) + assert_optimized_plan_eq(&plan, expected_after) } #[test] @@ -2282,7 +2291,7 @@ mod tests { \n TableScan: test\ \n Projection: test1.a AS d, test1.a AS e\ \n TableScan: test1"; - assert_optimized_plan_equal(&plan, expected)?; + assert_optimized_plan_eq(&plan, expected)?; // Originally global state which can help to avoid duplicate Filters been generated and pushed down. // Now the global state is removed. Need to double confirm that avoid duplicate Filters. @@ -2290,6 +2299,6 @@ mod tests { .try_optimize(&plan, &OptimizerContext::new()) .unwrap() .expect("failed to optimize plan"); - assert_optimized_plan_equal(&optimized_plan, expected) + assert_optimized_plan_eq(&optimized_plan, expected) } } diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 74ad70e1d6b80..9b766489266b6 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -226,7 +226,6 @@ impl OptimizerRule for SingleDistinctToGroupBy { mod tests { use super::*; use crate::test::*; - use crate::OptimizerContext; use datafusion_expr::expr; use datafusion_expr::expr::GroupingSet; use datafusion_expr::{ From 44cfcdaeb40d7e328bd2e3d48978d2c23d41a884 Mon Sep 17 00:00:00 2001 From: jackwener Date: Wed, 21 Dec 2022 20:38:56 +0800 Subject: [PATCH 3/3] cargo fmt --- datafusion/optimizer/src/push_down_limit.rs | 4 ++-- .../src/rewrite_disjunctive_predicate.rs | 4 ++-- .../optimizer/src/single_distinct_to_groupby.rs | 6 +++++- .../optimizer/src/subquery_filter_to_join.rs | 16 +++++++++------- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 4070ef120b3e1..2619091832e1b 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -79,7 +79,7 @@ impl OptimizerRule for PushDownLimit { fn try_optimize( &self, plan: &LogicalPlan, - config: &dyn OptimizerConfig, + _config: &dyn OptimizerConfig, ) -> Result> { let limit = match plan { LogicalPlan::Limit(limit) => limit, @@ -113,7 +113,7 @@ impl OptimizerRule for PushDownLimit { fetch: new_fetch, input: Arc::new((*child_limit.input).clone()), }); - return self.try_optimize(&plan, config); + return self.try_optimize(&plan, _config); } let fetch = match limit.fetch { diff --git a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs index 064918b1a71f7..c14574eaf6f7c 100644 --- a/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs +++ b/datafusion/optimizer/src/rewrite_disjunctive_predicate.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. +use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::expr::BinaryExpr; use datafusion_expr::logical_plan::Filter; use datafusion_expr::{Expr, LogicalPlan, Operator}; -use crate::optimizer::ApplyOrder; /// Optimizer pass that rewrites predicates of the form /// @@ -136,7 +136,7 @@ impl OptimizerRule for RewriteDisjunctivePredicate { let rewritten_expr = normalize_predicate(rewritten_predicate); Ok(Some(LogicalPlan::Filter(Filter::try_new( rewritten_expr, - filter.input.clone() + filter.input.clone(), )?))) } _ => Ok(None), diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 9b766489266b6..c03d763b21e01 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -234,7 +234,11 @@ mod tests { }; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { - assert_optimized_plan_eq_display_indent(Arc::new(SingleDistinctToGroupBy::new()), plan, expected); + assert_optimized_plan_eq_display_indent( + Arc::new(SingleDistinctToGroupBy::new()), + plan, + expected, + ); Ok(()) } diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs index 5ee0a0d6703fb..8d0c4e88d02e1 100644 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ b/datafusion/optimizer/src/subquery_filter_to_join.rs @@ -26,6 +26,7 @@ //! WHERE t1.f IN (SELECT f FROM t2) OR t2.f = 'x' //! ``` //! won't +use crate::optimizer::ApplyOrder; use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{ @@ -36,7 +37,6 @@ use datafusion_expr::{ Expr, }; use std::sync::Arc; -use crate::optimizer::ApplyOrder; /// Optimizer rule for rewriting subquery filters to joins #[derive(Default)] @@ -53,7 +53,7 @@ impl OptimizerRule for SubqueryFilterToJoin { fn try_optimize( &self, plan: &LogicalPlan, - config: &dyn OptimizerConfig, + _config: &dyn OptimizerConfig, ) -> Result> { match plan { LogicalPlan::Filter(filter) => { @@ -97,7 +97,7 @@ impl OptimizerRule for SubqueryFilterToJoin { } => { let right_input = self.try_optimize( &subquery.subquery, - config + _config )?.unwrap_or_else(||subquery.subquery.as_ref().clone()); let right_schema = right_input.schema(); if right_schema.fields().len() != 1 { @@ -165,9 +165,7 @@ impl OptimizerRule for SubqueryFilterToJoin { Ok(Some(utils::add_filter(new_input, ®ular_filters)?)) } } - _ => { - Ok(None) - } + _ => Ok(None), } } @@ -208,7 +206,11 @@ mod tests { }; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { - assert_optimized_plan_eq_display_indent(Arc::new(SubqueryFilterToJoin::new()), plan, expected); + assert_optimized_plan_eq_display_indent( + Arc::new(SubqueryFilterToJoin::new()), + plan, + expected, + ); Ok(()) }