From 62c6a5537a958f52f23cb8b5974217c39673a70f Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Thu, 12 Feb 2026 14:41:07 +0000 Subject: [PATCH 1/2] Fallback TimestampNTZ --- .../VeloxParquetDataTypeValidationSuite.scala | 14 +++++ .../spark/sql/utils/SparkArrowUtil.scala | 8 ++- .../apache/gluten/execution/DeltaSuite.scala | 62 +++++++++++++++++++ .../columnar/validator/Validators.scala | 26 ++++++++ 4 files changed, 108 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala index cf63cd445306..645c4e3dc853 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala @@ -465,6 +465,20 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit } } + test("Fallback for TimestampNTZ type scan") { + withTempDir { + dir => + val path = new File(dir, "ntz_data").toURI.getPath + val inputDf = + spark.sql("SELECT CAST('2024-01-01 00:00:00' AS TIMESTAMP_NTZ) AS ts_ntz") + inputDf.write.format("parquet").save(path) + val df = spark.read.format("parquet").load(path) + val executedPlan = getExecutedPlan(df) + assert(!executedPlan.exists(plan => plan.isInstanceOf[BatchScanExecTransformer])) + checkAnswer(df, inputDf) + } + } + test("Velox Parquet Write") { withSQLConf((GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")) { withTempDir { diff --git a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala index da3f5c0708dc..619686be7c9b 100644 --- a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala +++ b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala @@ -50,6 +50,8 @@ object SparkArrowUtil { } else { new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC") } + case TimestampNTZType => + new ArrowType.Timestamp(TimeUnit.MICROSECOND, null) case YearMonthIntervalType.DEFAULT => new ArrowType.Interval(IntervalUnit.YEAR_MONTH) case _: ArrayType => ArrowType.List.INSTANCE @@ -72,7 +74,8 @@ object SparkArrowUtil { case ArrowType.Binary.INSTANCE => BinaryType case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale) case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType - // TODO: Time unit is not handled. + case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND && ts.getTimezone == null => + TimestampNTZType case _: ArrowType.Timestamp => TimestampType case interval: ArrowType.Interval if interval.getUnit == IntervalUnit.YEAR_MONTH => YearMonthIntervalType.DEFAULT @@ -156,7 +159,8 @@ object SparkArrowUtil { }.asJava) } - // TimestampNTZ does not support + // TimestampNTZ is not supported for native computation, but the Arrow type mapping is needed + // for row-to-columnar transitions when the fallback validator tags NTZ operators. def checkSchema(schema: StructType): Boolean = { try { SparkSchemaUtil.toArrowSchema(schema) diff --git a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala index 57e90283611c..4dc487f1bad2 100644 --- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala +++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala @@ -340,4 +340,66 @@ abstract class DeltaSuite extends WholeStageTransformerSuite { } } } + + // TIMESTAMP_NTZ was introduced in Spark 3.4 / Delta 2.4 + testWithMinSparkVersion( + "delta: create table with TIMESTAMP_NTZ should fallback and return correct results", + "3.4") { + withTable("delta_ntz") { + spark.sql("CREATE TABLE delta_ntz(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING DELTA") + spark.sql("""INSERT INTO delta_ntz VALUES + |('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin) + val df = runQueryAndCompare("select * from delta_ntz", noFallBack = false) { _ => } + checkAnswer( + df, + Row( + "foo", + java.sql.Timestamp.valueOf("2022-01-02 03:04:05.123456"), + java.time.LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))) + } + } + + testWithMinSparkVersion( + "delta: TIMESTAMP_NTZ as partition column should fallback and return correct results", + "3.4") { + withTable("delta_ntz_part") { + spark.sql("""CREATE TABLE delta_ntz_part(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) + |USING DELTA PARTITIONED BY (c3)""".stripMargin) + spark.sql("""INSERT INTO delta_ntz_part VALUES + |('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456'), + |('bar','2023-06-15 10:30:00.000000','2023-06-15 10:30:00.000000')""".stripMargin) + val df = runQueryAndCompare("select * from delta_ntz_part order by c1", noFallBack = false) { + _ => + } + checkAnswer( + df, + Seq( + Row( + "bar", + java.sql.Timestamp.valueOf("2023-06-15 10:30:00"), + java.time.LocalDateTime.of(2023, 6, 15, 10, 30, 0, 0)), + Row( + "foo", + java.sql.Timestamp.valueOf("2022-01-02 03:04:05.123456"), + java.time.LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000)) + ) + ) + } + } + + testWithMinSparkVersion( + "delta: filter on TIMESTAMP_NTZ column should fallback and return correct results", + "3.4") { + withTable("delta_ntz_filter") { + spark.sql("CREATE TABLE delta_ntz_filter(id INT, ts TIMESTAMP_NTZ) USING DELTA") + spark.sql("""INSERT INTO delta_ntz_filter VALUES + |(1, '2022-01-01 00:00:00'), + |(2, '2023-01-01 00:00:00'), + |(3, '2024-01-01 00:00:00')""".stripMargin) + val df = runQueryAndCompare( + "select id from delta_ntz_filter where ts > '2022-06-01 00:00:00'", + noFallBack = false) { _ => } + checkAnswer(df, Seq(Row(2), Row(3))) + } + } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index caa6ba16ba9a..1ec796518d7d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer +import org.apache.spark.sql.types._ object Validators { implicit class ValidatorBuilderImplicits(builder: Validator.Builder) { @@ -78,6 +79,11 @@ object Validators { builder.add(new FallbackByTestInjects()) } + /** Fails validation if a plan node's input or output schema contains TimestampNTZType. */ + def fallbackByTimestampNTZ(): Validator.Builder = { + builder.add(new FallbackByTimestampNTZ()) + } + /** * Fails validation on non-scan plan nodes if Gluten is running as scan-only mode. Also, passes * validation on filter for the exception that filter + scan is detected. Because filters can be @@ -212,6 +218,25 @@ object Validators { } } + private class FallbackByTimestampNTZ() extends Validator { + override def validate(plan: SparkPlan): Validator.OutCome = { + def containsNTZ(dataType: DataType): Boolean = dataType match { + case TimestampNTZType => true + case st: StructType => st.exists(f => containsNTZ(f.dataType)) + case at: ArrayType => containsNTZ(at.elementType) + case mt: MapType => containsNTZ(mt.keyType) || containsNTZ(mt.valueType) + case _ => false + } + val hasNTZ = plan.output.exists(a => containsNTZ(a.dataType)) || + plan.children.exists(_.output.exists(a => containsNTZ(a.dataType))) + if (hasNTZ) { + fail(s"${plan.nodeName} has TimestampNTZType in input/output schema") + } else { + pass() + } + } + } + private class FallbackIfScanOnlyWithFilterPushed(scanOnly: Boolean) extends Validator { override def validate(plan: SparkPlan): Validator.OutCome = { if (!scanOnly) { @@ -292,6 +317,7 @@ object Validators { .fallbackComplexExpressions() .fallbackByBackendSettings() .fallbackByUserOptions() + .fallbackByTimestampNTZ() .fallbackByTestInjects() .build() } From e77fa3a544736ce5c27401f0b9530a50acfb24a5 Mon Sep 17 00:00:00 2001 From: Ankita Victor-Levi Date: Thu, 12 Feb 2026 15:04:35 +0000 Subject: [PATCH 2/2] Fix compilation for 3.2/3.3 --- .../VeloxParquetDataTypeValidationSuite.scala | 2 +- .../org/apache/spark/sql/utils/SparkArrowUtil.scala | 13 +++++++++++-- .../extension/columnar/validator/Validators.scala | 4 ++-- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala index 645c4e3dc853..a5e814dc0fcb 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala @@ -465,7 +465,7 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit } } - test("Fallback for TimestampNTZ type scan") { + testWithMinSparkVersion("Fallback for TimestampNTZ type scan", "3.4") { withTempDir { dir => val path = new File(dir, "ntz_data").toURI.getPath diff --git a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala index 619686be7c9b..0e2ce5e30931 100644 --- a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala +++ b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala @@ -50,7 +50,7 @@ object SparkArrowUtil { } else { new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC") } - case TimestampNTZType => + case dt if dt.catalogString == "timestamp_ntz" => new ArrowType.Timestamp(TimeUnit.MICROSECOND, null) case YearMonthIntervalType.DEFAULT => new ArrowType.Interval(IntervalUnit.YEAR_MONTH) @@ -75,7 +75,16 @@ object SparkArrowUtil { case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale) case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND && ts.getTimezone == null => - TimestampNTZType + // TimestampNTZType is only available in Spark 3.4+ + try { + Class + .forName("org.apache.spark.sql.types.TimestampNTZType$") + .getField("MODULE$") + .get(null) + .asInstanceOf[DataType] + } catch { + case _: ClassNotFoundException => TimestampType + } case _: ArrowType.Timestamp => TimestampType case interval: ArrowType.Interval if interval.getUnit == IntervalUnit.YEAR_MONTH => YearMonthIntervalType.DEFAULT diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 1ec796518d7d..b20663093f75 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.HiveTableScanExecTransformer -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} object Validators { implicit class ValidatorBuilderImplicits(builder: Validator.Builder) { @@ -221,7 +221,7 @@ object Validators { private class FallbackByTimestampNTZ() extends Validator { override def validate(plan: SparkPlan): Validator.OutCome = { def containsNTZ(dataType: DataType): Boolean = dataType match { - case TimestampNTZType => true + case dt if dt.catalogString == "timestamp_ntz" => true case st: StructType => st.exists(f => containsNTZ(f.dataType)) case at: ArrayType => containsNTZ(at.elementType) case mt: MapType => containsNTZ(mt.keyType) || containsNTZ(mt.valueType)