diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala index e14dd1fcb5a8..163a21c2a502 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala @@ -524,6 +524,74 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } + test("optimize sort window to streaming window") { + withSQLConf(VeloxConfig.COLUMNAR_VELOX_WINDOW_TYPE.key -> "sort") { + runQueryAndCompare( + """ + |SELECT avg(sum_part_key) OVER ( + | PARTITION BY + | l_suppkey + | ORDER BY + | l_orderkey + | ) AS avg_sum_part_key + |FROM ( + | SELECT l_suppkey, + | l_orderkey, + | sum(l_partkey+1) OVER ( + | PARTITION BY + | l_suppkey + | ORDER BY + | l_orderkey + | ) AS sum_part_key + | FROM lineitem + | ) + |""".stripMargin + ) { + df => + { + val executedPlan = getExecutedPlan(df) + val windowPlans = executedPlan + .filter(_.isInstanceOf[WindowExecTransformer]) + .asInstanceOf[Seq[WindowExecTransformer]] + assert(windowPlans.size == 2) + assert(windowPlans.count(_.isChildOrderAlreadySatisfied) == 1) + assert(windowPlans.count(!_.isChildOrderAlreadySatisfied) == 1) + } + } + + runQueryAndCompare( + """ + |SELECT l_suppkey, + | l_orderkey, + | sum(l_partkey+1) OVER ( + | PARTITION BY + | l_suppkey + | ORDER BY + | l_orderkey, l_comment + | ) AS sum_part_key, + | avg(l_partkey+1) OVER ( + | PARTITION BY + | l_suppkey + | ORDER BY + | l_orderkey + | ) AS avg_part_key + |FROM lineitem + |""".stripMargin + ) { + df => + { + val executedPlan = getExecutedPlan(df) + val windowPlans = executedPlan + .filter(_.isInstanceOf[WindowExecTransformer]) + .asInstanceOf[Seq[WindowExecTransformer]] + assert(windowPlans.size == 2) + assert(windowPlans.count(_.isChildOrderAlreadySatisfied) == 1) + assert(windowPlans.count(!_.isChildOrderAlreadySatisfied) == 1) + } + } + } + } + test("df.count()") { val df = runQueryAndCompare("select * from lineitem limit 1") { _ => } checkLengthAndPlan(df, 1) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala index 5cf506adc46d..9d9c7cdea93f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala @@ -76,7 +76,14 @@ case class WindowExecTransformer( } } - override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def outputOrdering: Seq[SortOrder] = { + if (!BackendsApiManager.getSettings.requiredChildOrderingForWindow()) { + // Velox SortWindow can make sure the output order. + partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec + } else { + child.outputOrdering + } + } override def outputPartitioning: Partitioning = child.outputPartitioning @@ -85,7 +92,20 @@ case class WindowExecTransformer( val windowParametersStr = new StringBuffer("WindowParameters:") // isStreaming: 1 for streaming, 0 for sort val isStreaming: Int = - if (BackendsApiManager.getSettings.requiredChildOrderingForWindow()) 1 else 0 + if (BackendsApiManager.getSettings.requiredChildOrderingForWindow()) { + 1 + } else { + val isAlreadySatisfied = isChildOrderAlreadySatisfied + // Optimize sort window to streaming window when child order already + // match window requirements. + if (isAlreadySatisfied) { + logInfo("Mark window type to streaming since child order is already satisfied.") + 1 + } else { + logInfo("Mark window type to sort since child order is not satisfied.") + 0 + } + } windowParametersStr .append("isStreaming=") @@ -184,4 +204,22 @@ case class WindowExecTransformer( override protected def withNewChildInternal(newChild: SparkPlan): WindowExecTransformer = copy(child = newChild) + + // We make this method public for test purpose. + def isChildOrderAlreadySatisfied: Boolean = { + val requiredOrdering = partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec + logInfo( + s"Check if child order already satisfied, " + + s"current window node is ${this.treeString}, " + + s"required ordering is ${requiredOrdering.map(_.treeString).mkString(",")}, " + + s"child is ${child.treeString}, " + + s"child.outputOrder is ${child.outputOrdering.map(_.treeString).mkString(",")}") + if (requiredOrdering.size <= child.outputOrdering.size) { + SortOrder.orderingSatisfies( + child.outputOrdering.take(requiredOrdering.size), + requiredOrdering) + } else { + false + } + } }