From c00c10950cd8ba074fe5c24343a2f606fe3a88fb Mon Sep 17 00:00:00 2001 From: hsiang-c Date: Fri, 29 Aug 2025 16:21:35 -0700 Subject: [PATCH 1/3] Align sort constraints w/ arrow-rs --- .../apache/comet/serde/QueryPlanSerde.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index bef7e15e10..c96e56de64 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2255,28 +2255,35 @@ object QueryPlanSerde extends Logging with CometExprShim { } - // TODO: Remove this constraint when we upgrade to new arrow-rs including - // https://github.com/apache/arrow-rs/pull/6225 + /** + * Align w/ Arrow's [[https://github.com/apache/arrow-rs/blob/55.2.0/arrow-ord/src/rank.rs#L30-L40 can_rank]] + * and [[https://github.com/apache/arrow-rs/blob/55.2.0/arrow-ord/src/sort.rs#L193-L215 can_sort_to_indices]] + * + * TODO: Include SparkSQL's [[YearMonthIntervalType]] and [[DayTimeIntervalType]] + */ def supportedSortType(op: SparkPlan, sortOrder: Seq[SortOrder]): Boolean = { def canRank(dt: DataType): Boolean = { dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: TimestampType | _: DecimalType | _: DateType => + _: DoubleType | _: DecimalType => true - case _: BinaryType | _: StringType => true + case _: DateType | _: TimestampType | _: TimestampNTZType => + true + case _: BooleanType | _: BinaryType | _: StringType => true case _ => false } } if (sortOrder.length == 1) { val canSort = sortOrder.head.dataType match { - case _: BooleanType => true case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: TimestampType | _: TimestampNTZType | _: DecimalType | - _: DateType => + _: DoubleType | _: DecimalType => + true + case _: DateType | _: TimestampType | _: TimestampNTZType => true - case _: BinaryType | _: StringType => true + case _: BooleanType | _: BinaryType | _: StringType => true case ArrayType(elementType, _) => canRank(elementType) + case MapType(_, valueType, _) => canRank(valueType) case _ => false } if (!canSort) { From 6b1762e89ae32e1a49f50a7a0e3c7b1b65ff5e21 Mon Sep 17 00:00:00 2001 From: hsiang-c Date: Fri, 29 Aug 2025 16:22:35 -0700 Subject: [PATCH 2/3] Test BooleanType, TimestampNTZType and MapType --- .../apache/comet/exec/CometExecSuite.scala | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 339f90e81c..10c6aaa201 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -201,6 +201,114 @@ class CometExecSuite extends CometTestBase { } } + test("Sort on array of boolean") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW test_list AS SELECT * FROM VALUES + | (array(true)), + | (array(false)), + | (array(false)), + | (array(false)) AS test(arr) + |""".stripMargin) + + val df = sql(""" + SELECT * FROM test_list ORDER BY arr + |""".stripMargin) + val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec => + s + }.headOption + assert(sort.isDefined) + } + } + + test("Sort on TimestampNTZType") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW test_list AS SELECT * FROM VALUES + | (TIMESTAMP_NTZ'2025-08-29 00:00:00'), + | (TIMESTAMP_NTZ'2023-07-07 00:00:00'), + | (convert_timezone('Asia/Kathmandu', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00')), + | (convert_timezone('America/Los_Angeles', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00')), + | (TIMESTAMP_NTZ'1969-12-31 00:00:00') AS test(ts_ntz) + |""".stripMargin) + + val df = sql(""" + SELECT * FROM test_list ORDER BY ts_ntz + |""".stripMargin) + checkSparkAnswer(df) + val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec => + s + }.headOption + assert(sort.isDefined) + } + } + + test("Sort on map w/ TimestampNTZType values") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW test_map AS SELECT * FROM VALUES + | (map('a', TIMESTAMP_NTZ'2025-08-29 00:00:00')), + | (map('b', TIMESTAMP_NTZ'2023-07-07 00:00:00')), + | (map('c', convert_timezone('Asia/Kathmandu', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00'))), + | (map('d', convert_timezone('America/Los_Angeles', 'UTC', TIMESTAMP_NTZ'2023-07-07 00:00:00'))) AS test(map) + |""".stripMargin) + + val df = sql(""" + SELECT * FROM test_map ORDER BY map_values(map) DESC + |""".stripMargin) + checkSparkAnswer(df) + val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec => + s + }.headOption + assert(sort.isDefined) + } + } + + test("Sort on map w/ boolean values") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_EXEC_SORT_ENABLED.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> "jvm") { + + sql(""" + |CREATE OR REPLACE TEMPORARY VIEW test_map AS SELECT * FROM VALUES + | (map('a', true)), + | (map('b', true)), + | (map('c', false)), + | (map('d', true)) AS test(map) + |""".stripMargin) + + val df = sql(""" + SELECT * FROM test_map ORDER BY map_values(map) DESC + |""".stripMargin) + val sort = stripAQEPlan(df.queryExecution.executedPlan).collect { case s: CometSortExec => + s + }.headOption + assert(sort.isDefined) + } + } + test( "fall back to Spark when the partition spec and order spec are not the same for window function") { withTempView("test") { From 58bf0ed31247b0818fdee3f8f305508fc9ef19df Mon Sep 17 00:00:00 2001 From: hsiang-c Date: Tue, 2 Sep 2025 11:28:33 -0700 Subject: [PATCH 3/3] Style fix --- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index c96e56de64..0d38af216f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2255,12 +2255,15 @@ object QueryPlanSerde extends Logging with CometExprShim { } + // scalastyle:off /** - * Align w/ Arrow's [[https://github.com/apache/arrow-rs/blob/55.2.0/arrow-ord/src/rank.rs#L30-L40 can_rank]] - * and [[https://github.com/apache/arrow-rs/blob/55.2.0/arrow-ord/src/sort.rs#L193-L215 can_sort_to_indices]] + * Align w/ Arrow's + * [[https://github.com/apache/arrow-rs/blob/55.2.0/arrow-ord/src/rank.rs#L30-L40 can_rank]] and + * [[https://github.com/apache/arrow-rs/blob/55.2.0/arrow-ord/src/sort.rs#L193-L215 can_sort_to_indices]] * * TODO: Include SparkSQL's [[YearMonthIntervalType]] and [[DayTimeIntervalType]] */ + // scalastyle:off def supportedSortType(op: SparkPlan, sortOrder: Seq[SortOrder]): Boolean = { def canRank(dt: DataType): Boolean = { dt match {