From a5e954e20b0dccb2f1290d00164a9607f685209a Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Tue, 2 Sep 2025 16:26:17 +0800 Subject: [PATCH 1/4] [wip][VL]optimize sort window to streaming window if child order matchs --- .../execution/WindowExecTransformer.scala | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala index 5cf506adc46d..de01031ae8a7 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala @@ -85,7 +85,19 @@ case class WindowExecTransformer( val windowParametersStr = new StringBuffer("WindowParameters:") // isStreaming: 1 for streaming, 0 for sort val isStreaming: Int = - if (BackendsApiManager.getSettings.requiredChildOrderingForWindow()) 1 else 0 + if (BackendsApiManager.getSettings.requiredChildOrderingForWindow()) { + 1 + } else { + val isAlreadySatisfied = isChildOrderAlreadySatisfied + // Optimize sort window to streaming window when child order already match window requirements. + if (isAlreadySatisfied) { + logInfo("Mark window type to streaming since child order is already satisfied.") + 1 + } else { + logInfo("Mark window type to sort since child order is not satisfied.") + 0 + } + } windowParametersStr .append("isStreaming=") @@ -184,4 +196,21 @@ case class WindowExecTransformer( override protected def withNewChildInternal(newChild: SparkPlan): WindowExecTransformer = copy(child = newChild) + + private def isChildOrderAlreadySatisfied: Boolean = { + val requiredOrdering = partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec + logInfo( + s"Check if child order already satisfied, " + + s"current window node is ${this.treeString}, " + + s"required ordering is ${requiredOrdering.map(_.treeString).mkString(",")}, " + + s"child is ${child.treeString}, " + + s"child.outputOrder is ${child.outputOrdering.map(_.treeString).mkString(",")}") + if (requiredOrdering.size <= child.outputOrdering.size) { + SortOrder.orderingSatisfies( + child.outputOrdering.take(requiredOrdering.size), + requiredOrdering) + } else { + false + } + } } From 686a4742215224ff876b80da03cee60d4d329030 Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Fri, 5 Sep 2025 18:02:41 +0800 Subject: [PATCH 2/4] add test --- .../gluten/execution/MiscOperatorSuite.scala | 55 +++++++++++++++++++ .../execution/WindowExecTransformer.scala | 3 +- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index e14dd1fcb5a8..30476ac2e3ac 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -524,6 +524,61 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } + test("optimize sort window to streaming window") { + withSQLConf(VeloxConfig.COLUMNAR_VELOX_WINDOW_TYPE.key -> "sort") { + runQueryAndCompare( + "select sum(l_partkey + 1) over" + + " (partition by l_suppkey order by l_orderkey) from lineitem" + + " distribute by l_suppkey order by l_oderkey") { + checkGlutenOperatorMatch[WindowExecTransformer] + } + + runQueryAndCompare( + "select max(l_partkey) over" + + " (partition by l_suppkey order by l_orderkey) from lineitem ") { + checkGlutenOperatorMatch[WindowExecTransformer] + } + + runQueryAndCompare( + "select min(l_partkey) over" + + " (partition by l_suppkey order by l_orderkey) from lineitem ") { + checkGlutenOperatorMatch[WindowExecTransformer] + } + + runQueryAndCompare( + "select avg(l_partkey) over" + + " (partition by l_suppkey order by l_orderkey) from lineitem ") { + checkGlutenOperatorMatch[WindowExecTransformer] + } + + runQueryAndCompare( + "select lag(l_orderkey) over" + + " (partition by l_suppkey order by l_orderkey) from lineitem ") { + checkGlutenOperatorMatch[WindowExecTransformer] + } + + runQueryAndCompare( + "select lead(l_orderkey) over" + + " (partition by l_suppkey order by l_orderkey) from lineitem ") { + checkGlutenOperatorMatch[WindowExecTransformer] + } + + // Test same partition/ordering keys. + runQueryAndCompare( + "select avg(l_partkey) over" + + " (partition by l_suppkey order by l_suppkey) from lineitem ") { + checkGlutenOperatorMatch[WindowExecTransformer] + } + + // Test overlapping partition/ordering keys. + runQueryAndCompare( + "select avg(l_partkey) over" + + " (partition by l_suppkey order by l_suppkey, l_orderkey) from lineitem ") { + checkGlutenOperatorMatch[WindowExecTransformer] + } + } + } + test("df.count()") { val df = runQueryAndCompare("select * from lineitem limit 1") { _ => } checkLengthAndPlan(df, 1) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala index de01031ae8a7..8a5af24dfa50 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala @@ -89,7 +89,8 @@ case class WindowExecTransformer( 1 } else { val isAlreadySatisfied = isChildOrderAlreadySatisfied - // Optimize sort window to streaming window when child order already match window requirements. + // Optimize sort window to streaming window when child order already + // match window requirements. if (isAlreadySatisfied) { logInfo("Mark window type to streaming since child order is already satisfied.") 1 From 5e47c56a0a6b77737bbed739c045ec97029822ec Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Mon, 8 Sep 2025 17:46:43 +0800 Subject: [PATCH 3/4] update test --- .../gluten/execution/MiscOperatorSuite.scala | 55 +++---------------- 1 file changed, 9 insertions(+), 46 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index 30476ac2e3ac..21aa2690ba94 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -527,53 +527,16 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa test("optimize sort window to streaming window") { withSQLConf(VeloxConfig.COLUMNAR_VELOX_WINDOW_TYPE.key -> "sort") { runQueryAndCompare( - "select sum(l_partkey + 1) over" + - " (partition by l_suppkey order by l_orderkey) from lineitem" + - " distribute by l_suppkey order by l_oderkey") { - checkGlutenOperatorMatch[WindowExecTransformer] - } - - runQueryAndCompare( - "select max(l_partkey) over" + - " (partition by l_suppkey order by l_orderkey) from lineitem ") { - checkGlutenOperatorMatch[WindowExecTransformer] - } - - runQueryAndCompare( - "select min(l_partkey) over" + - " (partition by l_suppkey order by l_orderkey) from lineitem ") { - checkGlutenOperatorMatch[WindowExecTransformer] - } - - runQueryAndCompare( - "select avg(l_partkey) over" + - " (partition by l_suppkey order by l_orderkey) from lineitem ") { - checkGlutenOperatorMatch[WindowExecTransformer] - } - - runQueryAndCompare( - "select lag(l_orderkey) over" + - " (partition by l_suppkey order by l_orderkey) from lineitem ") { - checkGlutenOperatorMatch[WindowExecTransformer] - } - - runQueryAndCompare( - "select lead(l_orderkey) over" + - " (partition by l_suppkey order by l_orderkey) from lineitem ") { - checkGlutenOperatorMatch[WindowExecTransformer] - } - - // Test same partition/ordering keys. - runQueryAndCompare( - "select avg(l_partkey) over" + - " (partition by l_suppkey order by l_suppkey) from lineitem ") { - checkGlutenOperatorMatch[WindowExecTransformer] - } + """ + |select sum(l_partkey + 1) over + | (partition by l_suppkey order by l_orderkey) + | from + | ( select * from lineitem + | sort by l_suppkey, l_orderkey + | ) + |""".stripMargin - // Test overlapping partition/ordering keys. - runQueryAndCompare( - "select avg(l_partkey) over" + - " (partition by l_suppkey order by l_suppkey, l_orderkey) from lineitem ") { + ) { checkGlutenOperatorMatch[WindowExecTransformer] } } From 27060c6884f746bc2feb62294a83f6e8e84b363c Mon Sep 17 00:00:00 2001 From: Terry Wang Date: Wed, 10 Sep 2025 17:01:07 +0800 Subject: [PATCH 4/4] add test --- .../gluten/execution/MiscOperatorSuite.scala | 64 +++++++++++++++++-- .../execution/WindowExecTransformer.scala | 12 +++- 2 files changed, 67 insertions(+), 9 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index 21aa2690ba94..163a21c2a502 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -528,16 +528,66 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa withSQLConf(VeloxConfig.COLUMNAR_VELOX_WINDOW_TYPE.key -> "sort") { runQueryAndCompare( """ - |select sum(l_partkey + 1) over - | (partition by l_suppkey order by l_orderkey) - | from - | ( select * from lineitem - | sort by l_suppkey, l_orderkey - | ) + |SELECT avg(sum_part_key) OVER ( + | PARTITION BY + | l_suppkey + | ORDER BY + | l_orderkey + | ) AS avg_sum_part_key + |FROM ( + | SELECT l_suppkey, + | l_orderkey, + | sum(l_partkey+1) OVER ( + | PARTITION BY + | l_suppkey + | ORDER BY + | l_orderkey + | ) AS sum_part_key + | FROM lineitem + | ) |""".stripMargin + ) { + df => + { + val executedPlan = getExecutedPlan(df) + val windowPlans = executedPlan + .filter(_.isInstanceOf[WindowExecTransformer]) + .asInstanceOf[Seq[WindowExecTransformer]] + assert(windowPlans.size == 2) + assert(windowPlans.count(_.isChildOrderAlreadySatisfied) == 1) + assert(windowPlans.count(!_.isChildOrderAlreadySatisfied) == 1) + } + } + runQueryAndCompare( + """ + |SELECT l_suppkey, + | l_orderkey, + | sum(l_partkey+1) OVER ( + | PARTITION BY + | l_suppkey + | ORDER BY + | l_orderkey, l_comment + | ) AS sum_part_key, + | avg(l_partkey+1) OVER ( + | PARTITION BY + | l_suppkey + | ORDER BY + | l_orderkey + | ) AS avg_part_key + |FROM lineitem + |""".stripMargin ) { - checkGlutenOperatorMatch[WindowExecTransformer] + df => + { + val executedPlan = getExecutedPlan(df) + val windowPlans = executedPlan + .filter(_.isInstanceOf[WindowExecTransformer]) + .asInstanceOf[Seq[WindowExecTransformer]] + assert(windowPlans.size == 2) + assert(windowPlans.count(_.isChildOrderAlreadySatisfied) == 1) + assert(windowPlans.count(!_.isChildOrderAlreadySatisfied) == 1) + } } } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala index 8a5af24dfa50..9d9c7cdea93f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala @@ -76,7 +76,14 @@ case class WindowExecTransformer( } } - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def outputOrdering: Seq[SortOrder] = { + if (!BackendsApiManager.getSettings.requiredChildOrderingForWindow()) { + // Velox SortWindow can make sure the output order. + partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec + } else { + child.outputOrdering + } + } override def outputPartitioning: Partitioning = child.outputPartitioning @@ -198,7 +205,8 @@ case class WindowExecTransformer( override protected def withNewChildInternal(newChild: SparkPlan): WindowExecTransformer = copy(child = newChild) - private def isChildOrderAlreadySatisfied: Boolean = { + // We make this method public for test purpose. + def isChildOrderAlreadySatisfied: Boolean = { val requiredOrdering = partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec logInfo( s"Check if child order already satisfied, " +