From 19196df7b0923e4ef10bf9b00373e11f2b81e670 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 1 Apr 2026 13:11:21 -0600 Subject: [PATCH 1/5] fix: use UTC for Arrow schema timezone in CometSparkToColumnarExec and CometLocalTableScanExec CometSparkToColumnarExec and CometLocalTableScanExec used the session timezone for Arrow schema metadata, but the native side always deserializes Timestamp as Timestamp(Microsecond, Some("UTC")). This causes an unnecessary cast in ScanExec when timezones don't match. Spark's internal timestamps are always UTC microseconds, so the timezone in the Arrow schema is purely metadata. Using UTC matches NativeBatchReader and the native serde. Also remove outdated "known issues with non-UTC timezones" warnings from config descriptions and move configs from testing to exec category. Closes #2720 --- .../scala/org/apache/comet/CometConf.scala | 21 +++++------ .../sql/comet/CometLocalTableScanExec.scala | 3 +- .../sql/comet/CometSparkToColumnarExec.scala | 7 +++- .../apache/comet/exec/CometExecSuite.scala | 36 +++++++++++++++++++ 4 files changed, 53 insertions(+), 14 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 046ccf0b1c..32522bf51f 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -222,31 +222,28 @@ object CometConf extends ShimCometConf { val COMET_CONVERT_FROM_PARQUET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.convert.parquet.enabled") - .category(CATEGORY_TESTING) + .category(CATEGORY_EXEC) .doc( "When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to " + - "Arrow format. This is an experimental feature and has known issues with " + - "non-UTC timezones.") + "Arrow format.") .booleanConf .createWithDefault(false) val COMET_CONVERT_FROM_JSON_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.convert.json.enabled") - .category(CATEGORY_TESTING) + .category(CATEGORY_EXEC) .doc( "When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to " + - "Arrow format. This is an experimental feature and has known issues with " + - "non-UTC timezones.") + "Arrow format.") .booleanConf .createWithDefault(false) val COMET_CONVERT_FROM_CSV_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.convert.csv.enabled") - .category(CATEGORY_TESTING) + .category(CATEGORY_EXEC) .doc( "When enabled, data from Spark (non-native) CSV v1 and v2 scans will be converted to " + - "Arrow format. This is an experimental feature and has known issues with " + - "non-UTC timezones.") + "Arrow format.") .booleanConf .createWithDefault(false) @@ -743,17 +740,17 @@ object CometConf extends ShimCometConf { val COMET_SPARK_TO_ARROW_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.sparkToColumnar.enabled") - .category(CATEGORY_TESTING) + .category(CATEGORY_EXEC) .doc("Whether to enable Spark to Arrow columnar conversion. When this is turned on, " + "Comet will convert operators in " + "`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before " + - "processing. This is an experimental feature and has known issues with non-UTC timezones.") + "processing.") .booleanConf .createWithDefault(false) val COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST: ConfigEntry[Seq[String]] = conf("spark.comet.sparkToColumnar.supportedOperatorList") - .category(CATEGORY_TESTING) + .category(CATEGORY_EXEC) .doc("A comma-separated list of operators that will be converted to Arrow columnar " + s"format when `${COMET_SPARK_TO_ARROW_ENABLED.key}` is true.") .stringConf diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala index 68a2ebf8ec..622168bcc9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -66,7 +66,8 @@ case class CometLocalTableScanExec( override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numInputRows = longMetric("numOutputRows") val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf) - val timeZoneId = conf.sessionLocalTimeZone + // Use UTC to match native side expectations. See CometSparkToColumnarExec. + val timeZoneId = "UTC" rdd.mapPartitionsInternal { sparkBatches => val context = TaskContext.get() val batches = CometArrowConverters.rowToArrowBatchIter( diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala index 8447c9d044..efe6a97d40 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometSparkToColumnarExec.scala @@ -97,7 +97,12 @@ case class CometSparkToColumnarExec(child: SparkPlan) val numOutputBatches = longMetric("numOutputBatches") val conversionTime = longMetric("conversionTime") val maxRecordsPerBatch = CometConf.COMET_BATCH_SIZE.get(conf) - val timeZoneId = conf.sessionLocalTimeZone + // Use UTC for Arrow schema timezone to match the native side, which always + // deserializes Timestamp as Timestamp(Microsecond, Some("UTC")). Spark's internal + // timestamp representation is always UTC microseconds, so the timezone here is + // purely schema metadata. Using session timezone would cause Arrow RowConverter + // schema mismatch errors in non-UTC sessions. See COMET-2720. + val timeZoneId = "UTC" val schema = child.schema if (child.supportsColumnar) { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index aff1816265..9cb89a7a11 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2173,6 +2173,42 @@ class CometExecSuite extends CometTestBase { } } + test("LocalTableScanExec with timestamps in non-UTC timezone") { + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + val df = Seq( + (1, java.sql.Timestamp.valueOf("2024-01-15 10:30:00")), + (2, java.sql.Timestamp.valueOf("2024-06-15 14:00:00")), + (3, java.sql.Timestamp.valueOf("2024-12-25 08:00:00"))) + .toDF("id", "ts") + .orderBy("ts") + checkSparkAnswerAndOperator(df) + } + } + + test("sort on timestamps with non-UTC timezone via LocalTableScan") { + // When session timezone is non-UTC, CometLocalTableScanExec and + // CometSparkToColumnarExec must use UTC for the Arrow schema timezone + // to match the native side's expectations. Without this, the native + // ScanExec sees a timezone mismatch and performs an unnecessary cast. + // The cast is currently a no-op (Arrow timestamps with timezone are + // always UTC microseconds), but using UTC avoids the overhead and + // keeps schemas consistent throughout the native plan. + withSQLConf( + CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true", + SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + val df = Seq( + (1, java.sql.Timestamp.valueOf("2024-01-15 10:30:00")), + (2, java.sql.Timestamp.valueOf("2024-06-15 14:00:00")), + (3, java.sql.Timestamp.valueOf("2024-12-25 08:00:00"))) + .toDF("id", "ts") + .repartition(2) + .orderBy("ts") + checkSparkAnswer(df) + } + } + } case class BucketedTableTestSpec( From 9e8bb44f9f58feb8f283db31be15fbaa05dedd48 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 10 Apr 2026 16:24:47 -0600 Subject: [PATCH 2/5] test: add SparkToColumnar non-UTC timezone test Add test for CometSparkToColumnarExec with non-UTC session timezone to cover the Parquet convert path in addition to the LocalTableScan tests. --- .../apache/comet/exec/CometExecSuite.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 9cb89a7a11..364e2cd651 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2187,6 +2187,27 @@ class CometExecSuite extends CometTestBase { } } + test("SparkToColumnar with timestamps in non-UTC timezone") { + withTempDir { dir => + val path = new java.io.File(dir, "data").getAbsolutePath + Seq( + (1, java.sql.Timestamp.valueOf("2024-01-15 10:30:00")), + (2, java.sql.Timestamp.valueOf("2024-06-15 14:00:00")), + (3, java.sql.Timestamp.valueOf("2024-12-25 08:00:00"))) + .toDF("id", "ts") + .write + .parquet(path) + withSQLConf( + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "false", + CometConf.COMET_SPARK_TO_ARROW_ENABLED.key -> "true", + CometConf.COMET_CONVERT_FROM_PARQUET_ENABLED.key -> "true", + SESSION_LOCAL_TIMEZONE.key -> "America/Los_Angeles") { + val df = spark.read.parquet(path).orderBy("ts") + checkSparkAnswerAndOperator(df) + } + } + } + test("sort on timestamps with non-UTC timezone via LocalTableScan") { // When session timezone is non-UTC, CometLocalTableScanExec and // CometSparkToColumnarExec must use UTC for the Arrow schema timezone From 652b0a383024e54ed0dff25961ec463250185832 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 13 Apr 2026 15:21:02 -0600 Subject: [PATCH 3/5] spotless --- spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 98beb27ebe..22983119bb 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -2301,7 +2301,6 @@ class CometExecSuite extends CometTestBase { } } - } case class BucketedTableTestSpec( From 8f4ecd2a90bf6d2a74e6d803bd8b9ddbff7df3d0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 16 Apr 2026 08:58:50 -0600 Subject: [PATCH 4/5] update some config docs --- .../main/scala/org/apache/comet/CometConf.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 046ccf0b1c..4cac6d2494 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -225,8 +225,7 @@ object CometConf extends ShimCometConf { .category(CATEGORY_TESTING) .doc( "When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to " + - "Arrow format. This is an experimental feature and has known issues with " + - "non-UTC timezones.") + "Arrow format.") .booleanConf .createWithDefault(false) @@ -235,8 +234,7 @@ object CometConf extends ShimCometConf { .category(CATEGORY_TESTING) .doc( "When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to " + - "Arrow format. This is an experimental feature and has known issues with " + - "non-UTC timezones.") + "Arrow format.") .booleanConf .createWithDefault(false) @@ -245,8 +243,7 @@ object CometConf extends ShimCometConf { .category(CATEGORY_TESTING) .doc( "When enabled, data from Spark (non-native) CSV v1 and v2 scans will be converted to " + - "Arrow format. This is an experimental feature and has known issues with " + - "non-UTC timezones.") + "Arrow format.") .booleanConf .createWithDefault(false) @@ -381,8 +378,10 @@ object CometConf extends ShimCometConf { val COMET_REPLACE_SMJ: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin") .category(CATEGORY_EXEC) - .doc("Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin " + - s"for improved performance. This feature is not stable yet. $TUNING_GUIDE.") + .doc( + "When enabled, Comet will replace sort-merge joins with hash joins, where possible. " + + "This can result in better performance but there is no spilling support, so this can " + + s"cause OOMs. $TUNING_GUIDE.") .booleanConf .createWithDefault(false) @@ -747,7 +746,7 @@ object CometConf extends ShimCometConf { .doc("Whether to enable Spark to Arrow columnar conversion. When this is turned on, " + "Comet will convert operators in " + "`spark.comet.sparkToColumnar.supportedOperatorList` into Arrow columnar format before " + - "processing. This is an experimental feature and has known issues with non-UTC timezones.") + "processing.") .booleanConf .createWithDefault(false) From 4d4f95b1f12dd7ca9951f7214de32c792509ac57 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 16 Apr 2026 09:02:38 -0600 Subject: [PATCH 5/5] Revert --- common/src/main/scala/org/apache/comet/CometConf.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index e8b003ee7e..32522bf51f 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -378,10 +378,8 @@ object CometConf extends ShimCometConf { val COMET_REPLACE_SMJ: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin") .category(CATEGORY_EXEC) - .doc( - "When enabled, Comet will replace sort-merge joins with hash joins, where possible. " + - "This can result in better performance but there is no spilling support, so this can " + - s"cause OOMs. $TUNING_GUIDE.") + .doc("Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin " + + s"for improved performance. This feature is not stable yet. $TUNING_GUIDE.") .booleanConf .createWithDefault(false)