From cc16648a6edc81a631786b864e4c6194e2ef8dad Mon Sep 17 00:00:00 2001 From: hhj Date: Thu, 7 Dec 2023 21:32:59 +0800 Subject: [PATCH 1/2] fix: don't unifies projection if expr is non-trival --- .../physical_optimizer/projection_pushdown.rs | 37 ++++++++++++++++-- datafusion/sqllogictest/test_files/select.slt | 39 +++++++++++++++++++ 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index f6c94edd8ca32..d707701a8ad5b 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -20,6 +20,7 @@ //! projections one by one if the operator below is amenable to this. If a //! projection reaches a source, it can even dissappear from the plan entirely. +use std::collections::HashMap; use std::sync::Arc; use super::output_requirements::OutputRequirementExec; @@ -42,9 +43,9 @@ use crate::physical_plan::{Distribution, ExecutionPlan}; use arrow_schema::SchemaRef; use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::JoinSide; -use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::{ Partitioning, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; @@ -245,12 +246,36 @@ fn try_swapping_with_streaming_table( } /// Unifies `projection` with its input (which is also a [`ProjectionExec`]). -/// Two consecutive projections can always merge into a single projection. fn try_unifying_projections( projection: &ProjectionExec, child: &ProjectionExec, ) -> Result>> { let mut projected_exprs = vec![]; + let mut column_ref_map: HashMap = HashMap::new(); + + // Collect the column references usage in the outer projection. + projection.expr().iter().for_each(|(expr, _)| { + expr.apply(&mut |expr| { + Ok({ + if let Some(column) = expr.as_any().downcast_ref::() { + *column_ref_map.entry(column.clone()).or_default() += 1; + } + VisitRecursion::Continue + }) + }) + .unwrap(); + }); + + // Merging these projections is not beneficial, e.g + // If an expression is not trivial and it is referred more than 1, unifies projections will be + // beneficial as caching mechanism for non-trivial computations. + // See discussion in: https://github.com/apache/arrow-datafusion/issues/8296 + if column_ref_map.iter().any(|(column, count)| { + *count > 1 && !is_expr_trivial(&child.expr()[column.index()].0.clone()) + }) { + return Ok(None); + } + for (expr, alias) in projection.expr() { // If there is no match in the input projection, we cannot unify these // projections. This case will arise if the projection expression contains @@ -265,6 +290,12 @@ fn try_unifying_projections( .map(|e| Some(Arc::new(e) as _)) } +// check if the expr trival +fn is_expr_trivial(expr: &Arc) -> bool { + expr.as_any().downcast_ref::().is_some() + || expr.as_any().downcast_ref::().is_some() +} + /// Tries to swap `projection` with its input (`output_req`). If possible, /// performs the swap and returns [`OutputRequirementExec`] as the top plan. /// Otherwise, returns `None`. diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index bbb05b6cffa71..ea570b99d4dd1 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1065,3 +1065,42 @@ select z+1, y from (select x+1 as z, y from t) where y > 1; ---- 3 2 3 3 + +query TT +EXPLAIN SELECT x/2, x/2+1 FROM t; +---- +logical_plan +Projection: t.x / Int64(2)Int64(2)t.x AS t.x / Int64(2), t.x / Int64(2)Int64(2)t.x AS t.x / Int64(2) + Int64(1) +--Projection: t.x / Int64(2) AS t.x / Int64(2)Int64(2)t.x +----TableScan: t projection=[x] +physical_plan +ProjectionExec: expr=[t.x / Int64(2)Int64(2)t.x@0 as t.x / Int64(2), t.x / Int64(2)Int64(2)t.x@0 + 1 as t.x / Int64(2) + Int64(1)] +--ProjectionExec: expr=[x@0 / 2 as t.x / Int64(2)Int64(2)t.x] +----MemoryExec: partitions=1, partition_sizes=[1] + +query II +SELECT x/2, x/2+1 FROM t; +---- +0 1 +0 1 + +query TT +EXPLAIN SELECT abs(x), abs(x) + abs(y) FROM t; +---- +logical_plan +Projection: abs(t.x)t.x AS abs(t.x), abs(t.x)t.x AS abs(t.x) + abs(t.y) +--Projection: abs(t.x) AS abs(t.x)t.x, t.y +----TableScan: t projection=[x, y] +physical_plan +ProjectionExec: expr=[abs(t.x)t.x@0 as abs(t.x), abs(t.x)t.x@0 + abs(y@1) as abs(t.x) + abs(t.y)] +--ProjectionExec: expr=[abs(x@0) as abs(t.x)t.x, y@1 as y] +----MemoryExec: partitions=1, partition_sizes=[1] + +query II +SELECT abs(x), abs(x) + abs(y) FROM t; +---- +1 3 +1 4 + +statement ok +DROP TABLE t; From 4b009e8271c41fa721be33e4bd16d0b17aecf1af Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 8 Dec 2023 17:03:18 +0800 Subject: [PATCH 2/2] Update datafusion/core/src/physical_optimizer/projection_pushdown.rs Co-authored-by: Alex Huang --- datafusion/core/src/physical_optimizer/projection_pushdown.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index d707701a8ad5b..67a2eaf0d9b3e 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -290,7 +290,8 @@ fn try_unifying_projections( .map(|e| Some(Arc::new(e) as _)) } -// check if the expr trival +/// Checks if the given expression is trivial. +/// An expression is considered trivial if it is either a `Column` or a `Literal`. fn is_expr_trivial(expr: &Arc) -> bool { expr.as_any().downcast_ref::().is_some() || expr.as_any().downcast_ref::().is_some()