Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2501,10 +2501,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim

case SortExec(sortOrder, _, child, _)
if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_SORT) =>
// TODO: Remove this constraint when we upgrade to new arrow-rs including
// https://github.com/apache/arrow-rs/pull/6225
if (child.output.length == 1 && child.output.head.dataType.isInstanceOf[StructType]) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be the column to sort (i.e., sort order) instead of child output. I made it wrong in #811.

withInfo(op, "Sort on single struct column is not supported")
if (!supportedSortType(op, sortOrder)) {
return None
}

Expand Down Expand Up @@ -3053,4 +3050,15 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
}

}

// TODO: Remove this constraint when we upgrade to new arrow-rs including
// https://github.com/apache/arrow-rs/pull/6225
def supportedSortType(op: SparkPlan, sortOrder: Seq[SortOrder]): Boolean = {
if (sortOrder.length == 1 && sortOrder.head.dataType.isInstanceOf[StructType]) {
withInfo(op, "Sort on single struct column is not supported")
Comment on lines +3057 to +3058
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @andygrove mentioned in #811, there are some more types that are not supported. I will add them in other PR.

false
} else {
true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.serde.QueryPlanSerde.exprToProto
import org.apache.comet.serde.QueryPlanSerde.supportedSortType
import org.apache.comet.shims.ShimCometTakeOrderedAndProjectExec

/**
Expand Down Expand Up @@ -137,6 +138,6 @@ object CometTakeOrderedAndProjectExec extends ShimCometTakeOrderedAndProjectExec
val exprs = plan.projectList.map(exprToProto(_, plan.child.output))
val sortOrders = plan.sortOrder.map(exprToProto(_, plan.child.output))
exprs.forall(_.isDefined) && sortOrders.forall(_.isDefined) && getOffset(plan).getOrElse(
0) == 0
0) == 0 && supportedSortType(plan, plan.sortOrder)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CometTakeOrderedAndProjectExec will invoke DataFusion SortExec operator, so we should also check for it.

}
}
19 changes: 17 additions & 2 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,25 @@ class CometExecSuite extends CometTestBase {
CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val data =
val data1 =
Seq(Tuple1(null), Tuple1((1, "a")), Tuple1((2, null)), Tuple1((3, "b")), Tuple1(null))

withParquetFile(data) { file =>
withParquetFile(data1) { file =>
readParquetFile(file) { df =>
val sort = df.sort("_1")
checkSparkAnswer(sort)
}
}

val data2 =
Seq(
Tuple2(null, 1),
Tuple2((1, "a"), 2),
Tuple2((2, null), 3),
Tuple2((3, "b"), 5),
Tuple2(null, 6))

withParquetFile(data2) { file =>
readParquetFile(file) { df =>
val sort = df.sort("_1")
checkSparkAnswer(sort)
Expand Down