diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index f944b2b045..ed98781363 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2501,10 +2501,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim case SortExec(sortOrder, _, child, _) if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_SORT) => - // TODO: Remove this constraint when we upgrade to new arrow-rs including - // https://github.com/apache/arrow-rs/pull/6225 - if (child.output.length == 1 && child.output.head.dataType.isInstanceOf[StructType]) { - withInfo(op, "Sort on single struct column is not supported") + if (!supportedSortType(op, sortOrder)) { return None } @@ -3053,4 +3050,15 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } } + + // TODO: Remove this constraint when we upgrade to new arrow-rs including + // https://github.com/apache/arrow-rs/pull/6225 + def supportedSortType(op: SparkPlan, sortOrder: Seq[SortOrder]): Boolean = { + if (sortOrder.length == 1 && sortOrder.head.dataType.isInstanceOf[StructType]) { + withInfo(op, "Sort on single struct column is not supported") + false + } else { + true + } + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala index 8f4c7424ea..f808a2b1d8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.comet.serde.QueryPlanSerde.exprToProto +import org.apache.comet.serde.QueryPlanSerde.supportedSortType import org.apache.comet.shims.ShimCometTakeOrderedAndProjectExec /** @@ -137,6 +138,6 @@ object CometTakeOrderedAndProjectExec extends ShimCometTakeOrderedAndProjectExec val exprs = plan.projectList.map(exprToProto(_, plan.child.output)) val sortOrders = plan.sortOrder.map(exprToProto(_, plan.child.output)) exprs.forall(_.isDefined) && sortOrders.forall(_.isDefined) && getOffset(plan).getOrElse( - 0) == 0 + 0) == 0 && supportedSortType(plan, plan.sortOrder) } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index b5c3bd0911..5ca9401c68 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -72,10 +72,25 @@ class CometExecSuite extends CometTestBase { CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { - val data = + val data1 = Seq(Tuple1(null), Tuple1((1, "a")), Tuple1((2, null)), Tuple1((3, "b")), Tuple1(null)) - withParquetFile(data) { file => + withParquetFile(data1) { file => + readParquetFile(file) { df => + val sort = df.sort("_1") + checkSparkAnswer(sort) + } + } + + val data2 = + Seq( + Tuple2(null, 1), + Tuple2((1, "a"), 2), + Tuple2((2, null), 3), + Tuple2((3, "b"), 5), + Tuple2(null, 6)) + + withParquetFile(data2) { file => readParquetFile(file) { df => val sort = df.sort("_1") checkSparkAnswer(sort)