Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,74 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}

test("optimize sort window to streaming window") {
withSQLConf(VeloxConfig.COLUMNAR_VELOX_WINDOW_TYPE.key -> "sort") {
Comment thread
zjuwangg marked this conversation as resolved.
runQueryAndCompare(
"""
|SELECT avg(sum_part_key) OVER (
| PARTITION BY
| l_suppkey
| ORDER BY
| l_orderkey
| ) AS avg_sum_part_key
|FROM (
| SELECT l_suppkey,
| l_orderkey,
| sum(l_partkey+1) OVER (
| PARTITION BY
| l_suppkey
| ORDER BY
| l_orderkey
| ) AS sum_part_key
| FROM lineitem
| )
|""".stripMargin
) {
df =>
{
val executedPlan = getExecutedPlan(df)
val windowPlans = executedPlan
.filter(_.isInstanceOf[WindowExecTransformer])
.asInstanceOf[Seq[WindowExecTransformer]]
assert(windowPlans.size == 2)
assert(windowPlans.count(_.isChildOrderAlreadySatisfied) == 1)
assert(windowPlans.count(!_.isChildOrderAlreadySatisfied) == 1)
}
}

runQueryAndCompare(
"""
|SELECT l_suppkey,
| l_orderkey,
| sum(l_partkey+1) OVER (
| PARTITION BY
| l_suppkey
| ORDER BY
| l_orderkey, l_comment
| ) AS sum_part_key,
| avg(l_partkey+1) OVER (
| PARTITION BY
| l_suppkey
| ORDER BY
| l_orderkey
| ) AS avg_part_key
|FROM lineitem
|""".stripMargin
) {
df =>
{
val executedPlan = getExecutedPlan(df)
val windowPlans = executedPlan
.filter(_.isInstanceOf[WindowExecTransformer])
.asInstanceOf[Seq[WindowExecTransformer]]
assert(windowPlans.size == 2)
assert(windowPlans.count(_.isChildOrderAlreadySatisfied) == 1)
assert(windowPlans.count(!_.isChildOrderAlreadySatisfied) == 1)
}
}
}
}

test("df.count()") {
val df = runQueryAndCompare("select * from lineitem limit 1") { _ => }
checkLengthAndPlan(df, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ case class WindowExecTransformer(
}
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputOrdering: Seq[SortOrder] = {
if (!BackendsApiManager.getSettings.requiredChildOrderingForWindow()) {
// Velox SortWindow can make sure the output order.
partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec
} else {
child.outputOrdering
}
}

override def outputPartitioning: Partitioning = child.outputPartitioning

Expand All @@ -85,7 +92,20 @@ case class WindowExecTransformer(
val windowParametersStr = new StringBuffer("WindowParameters:")
// isStreaming: 1 for streaming, 0 for sort
val isStreaming: Int =
if (BackendsApiManager.getSettings.requiredChildOrderingForWindow()) 1 else 0
if (BackendsApiManager.getSettings.requiredChildOrderingForWindow()) {
1
} else {
val isAlreadySatisfied = isChildOrderAlreadySatisfied
// Optimize sort window to streaming window when child order already
// match window requirements.
if (isAlreadySatisfied) {
logInfo("Mark window type to streaming since child order is already satisfied.")
1
} else {
logInfo("Mark window type to sort since child order is not satisfied.")
0
}
}

windowParametersStr
.append("isStreaming=")
Expand Down Expand Up @@ -184,4 +204,22 @@ case class WindowExecTransformer(

override protected def withNewChildInternal(newChild: SparkPlan): WindowExecTransformer =
copy(child = newChild)

// We make this method public for test purpose.
def isChildOrderAlreadySatisfied: Boolean = {
val requiredOrdering = partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec
logInfo(
s"Check if child order already satisfied, " +
s"current window node is ${this.treeString}, " +
s"required ordering is ${requiredOrdering.map(_.treeString).mkString(",")}, " +
s"child is ${child.treeString}, " +
s"child.outputOrder is ${child.outputOrdering.map(_.treeString).mkString(",")}")
if (requiredOrdering.size <= child.outputOrdering.size) {
SortOrder.orderingSatisfies(
child.outputOrdering.take(requiredOrdering.size),
requiredOrdering)
} else {
false
}
}
}
Loading