From d12bc787b14a820ce314c1be2d0a03d82018b1a1 Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Wed, 27 Aug 2025 17:42:08 +0000 Subject: [PATCH 01/15] Add Spark unix_timestamp support with timestamp and format arguments --- .../expression/ExpressionConverter.scala | 19 +++++++++++++++---- .../spark/sql/GlutenDateFunctionsSuite.scala | 16 ++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index fc1de383d051..16d2315afb1c 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -279,12 +279,23 @@ object ExpressionConverter extends SQLConfHelper with Logging { case t: ToUnixTimestamp => // The failOnError depends on the config for ANSI. ANSI is not supported currently. // And timeZoneId is passed to backend config. + // For timestamp and date inputs, the format parameter is ignored as per Spark behavior. + val timeExpTransformer = replaceWithExpressionTransformer0( + t.timeExp, attributeSeq, expressionsMap) + val children = t.timeExp.dataType match { + case _: TimestampType | _: TimestampNTZType | _: DateType => + // For timestamp/date input, format is ignored - only pass timeExp + Seq(timeExpTransformer) + case _ => + // For string input, format is used - pass both timeExp and format + Seq( + timeExpTransformer, + replaceWithExpressionTransformer0(t.format, attributeSeq, expressionsMap) + ) + } GenericExpressionTransformer( substraitExprName, - Seq( - replaceWithExpressionTransformer0(t.timeExp, attributeSeq, expressionsMap), - replaceWithExpressionTransformer0(t.format, attributeSeq, expressionsMap) - ), + children, t ) case u: UnixTimestamp => diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDateFunctionsSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDateFunctionsSuite.scala index f9c5995cafdd..082f06641b5a 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDateFunctionsSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDateFunctionsSuite.scala @@ -125,6 +125,14 @@ class GlutenDateFunctionsSuite extends DateFunctionsSuite with GlutenSQLTestsTra df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq(Row(secs(ts5.getTime)), Row(null))) + // Test unix_timestamp(timestamp, format) - format should be ignored + checkAnswer( + df.select(unix_timestamp(col("ts"), "yyyy-MM-dd")), + Seq(Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer( + df.selectExpr("unix_timestamp(ts, 'invalid-format')"), + Seq(Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + val now = sql("select unix_timestamp()").collect().head.getLong(0) checkAnswer( sql(s"select timestamp_seconds($now)"), @@ -187,6 +195,14 @@ class GlutenDateFunctionsSuite extends DateFunctionsSuite with GlutenSQLTestsTra df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq(Row(secs(ts5.getTime)), Row(null))) + // Test to_unix_timestamp(timestamp, format) - format should be ignored + checkAnswer( + df.selectExpr("to_unix_timestamp(ts, 'yyyy-MM-dd')"), + Seq(Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer( + df.selectExpr("to_unix_timestamp(ts, 'invalid-format')"), + Seq(Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + val invalid = df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd bb:HH:ss')") checkAnswer(invalid, Seq(Row(null), Row(null), Row(null), Row(null))) } From d31f23250b1e3a28195fee22da1fc7e9ecfb10ae Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Wed, 27 Aug 2025 18:13:17 +0000 Subject: [PATCH 02/15] form at fix --- .../org/apache/gluten/expression/ExpressionConverter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 16d2315afb1c..58e17d25eca8 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -280,8 +280,8 @@ object ExpressionConverter extends SQLConfHelper with Logging { // The failOnError depends on the config for ANSI. ANSI is not supported currently. // And timeZoneId is passed to backend config. // For timestamp and date inputs, the format parameter is ignored as per Spark behavior. - val timeExpTransformer = replaceWithExpressionTransformer0( - t.timeExp, attributeSeq, expressionsMap) + val timeExpTransformer = + replaceWithExpressionTransformer0(t.timeExp, attributeSeq, expressionsMap) val children = t.timeExp.dataType match { case _: TimestampType | _: TimestampNTZType | _: DateType => // For timestamp/date input, format is ignored - only pass timeExp From 38377ac1cede635c97537eb247984921cf912df8 Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Wed, 27 Aug 2025 18:20:38 +0000 Subject: [PATCH 03/15] form at fix --- .../org/apache/gluten/expression/ExpressionConverter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 58e17d25eca8..0302e656d2dc 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -283,7 +283,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { val timeExpTransformer = replaceWithExpressionTransformer0(t.timeExp, attributeSeq, expressionsMap) val children = t.timeExp.dataType match { - case _: TimestampType | _: TimestampNTZType | _: DateType => + case _: TimestampType | _: DateType => // For timestamp/date input, format is ignored - only pass timeExp Seq(timeExpTransformer) case _ => From bd08b7d39f9bbfbc89847476947834117e8b2b77 Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Thu, 28 Aug 2025 14:39:53 +0000 Subject: [PATCH 04/15] format fix --- .../execution/VeloxRDDScanTransformer.scala | 64 +++++++++++++++++++ .../expression/ExpressionConverter.scala | 39 +++++++---- 2 files changed, 92 insertions(+), 11 deletions(-) create mode 100644 backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxRDDScanTransformer.scala diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxRDDScanTransformer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxRDDScanTransformer.scala new file mode 100644 index 000000000000..fefe1424a06a --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxRDDScanTransformer.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.execution.ValidationResult +import org.apache.gluten.vectorized.VeloxColumnarBatch + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.vectorized.ColumnarBatch + +case class VeloxRDDScanTransformer( + outputAttributes: Seq[Attribute], + rdd: RDD[InternalRow], + name: String, + override val outputPartitioning: Partitioning = UnknownPartitioning(0), + override val outputOrdering: Seq[SortOrder] +) extends RDDScanTransformer(outputAttributes, outputPartitioning, outputOrdering) { + + override protected def doValidateInternal(): ValidationResult = { + ValidationResult.succeeded + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + rdd.mapPartitions { it => + if (it.hasNext) { + val rows = it.toArray + val batch = VeloxColumnarBatch.from(schema, rows) + Iterator.single(batch) + } else { + Iterator.empty + } + } + } + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = + copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering) +} + +object VeloxRDDScanTransformer { + def replace(rdd: RDDScanExec): RDDScanTransformer = + VeloxRDDScanTransformer( + rdd.output, + rdd.inputRDD, + rdd.nodeName, + rdd.outputPartitioning, + rdd.outputOrdering) +} \ No newline at end of file diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 0302e656d2dc..b51ecc675780 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -19,6 +19,8 @@ package org.apache.gluten.expression import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.sql.shims.SparkShimLoader +import org.apache.gluten.substrait.SubstraitContext +import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.gluten.test.TestStats import org.apache.gluten.utils.DecimalArithmeticUtil @@ -33,6 +35,7 @@ import org.apache.spark.sql.hive.HiveUDFTransformer import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer trait Transformable { @@ -282,22 +285,36 @@ object ExpressionConverter extends SQLConfHelper with Logging { // For timestamp and date inputs, the format parameter is ignored as per Spark behavior. val timeExpTransformer = replaceWithExpressionTransformer0(t.timeExp, attributeSeq, expressionsMap) - val children = t.timeExp.dataType match { - case _: TimestampType | _: DateType => - // For timestamp/date input, format is ignored - only pass timeExp - Seq(timeExpTransformer) + t.timeExp.dataType match { + case _: TimestampType => + // For timestamp input, use unix_seconds with custom signature + new ExpressionTransformer { + override def substraitExprName: String = "unix_seconds" + override def children: Seq[ExpressionTransformer] = Seq(timeExpTransformer) + override def original: Expression = t + override def doTransform(context: SubstraitContext): ExpressionNode = { + val funcName: String = ConverterUtils.makeFuncName( + substraitExprName, + Seq(t.timeExp.dataType) + ) + val functionId = context.registerFunction(funcName) + val childNodes = children.map(_.doTransform(context)).asJava + val typeNode = ConverterUtils.getTypeNode(dataType, nullable) + ExpressionBuilder.makeScalarFunction(functionId, childNodes, typeNode) + } + } case _ => - // For string input, format is used - pass both timeExp and format - Seq( + // For other inputs (date, string), use original logic + val children = Seq( timeExpTransformer, replaceWithExpressionTransformer0(t.format, attributeSeq, expressionsMap) ) + GenericExpressionTransformer( + substraitExprName, + children, + t + ) } - GenericExpressionTransformer( - substraitExprName, - children, - t - ) case u: UnixTimestamp => GenericExpressionTransformer( substraitExprName, From f6a3eb16b647364d38a4a53aa0bf77c6b14f443b Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Thu, 28 Aug 2025 15:00:47 +0000 Subject: [PATCH 05/15] adding transformer --- .../DateFunctionsValidateSuite.scala | 17 ++++++++++++ .../DateTimeExpressionsTransformer.scala | 26 +++++++++++++++++++ .../expression/ExpressionConverter.scala | 22 +++++----------- 3 files changed, 49 insertions(+), 16 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala index df761ce2dff1..81e1457021e5 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala @@ -471,4 +471,21 @@ abstract class DateFunctionsValidateSuite extends FunctionsValidateSuite { } } } + + test("unix_timestamp with timestamp and format - no fallback") { + withTempPath { + path => + Seq( + (Timestamp.valueOf("2016-04-08 13:10:15"), "yyyy-MM-dd"), + (Timestamp.valueOf("2017-05-19 18:25:30"), "MM/dd/yyyy") + ).toDF("ts", "fmt").write.parquet(path.getCanonicalPath) + + spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("unix_timestamp_test") + + // Test unix_timestamp(timestamp, format) - should use native execution without fallback + runQueryAndCompare("SELECT unix_timestamp(ts, fmt) FROM unix_timestamp_test") { + checkGlutenOperatorMatch[ProjectExecTransformer] + } + } + } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala index 2e2611d5abef..abd630ee3000 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala @@ -17,8 +17,13 @@ package org.apache.gluten.expression import org.apache.gluten.exception.GlutenNotSupportException +import org.apache.gluten.substrait.SubstraitContext +import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.TimestampType + +import scala.collection.JavaConverters._ /** The extract trait for 'GetDateField' from Date */ case class ExtractDateTransformer( @@ -102,3 +107,24 @@ object DateTimeExpressionsTransformer { scala.reflect.classTag[Second].runtimeClass -> "SECOND" ) } + +case class ToUnixTimestampTransformer( + substraitExprName: String, + timeExp: ExpressionTransformer, + original: ToUnixTimestamp) + extends ExpressionTransformer { + + override def children: Seq[ExpressionTransformer] = Seq(timeExp) + + override def doTransform(context: SubstraitContext): ExpressionNode = { + // Generate signature with only timestamp argument + val funcName: String = ConverterUtils.makeFuncName( + substraitExprName, // "to_unix_timestamp" + Seq(original.timeExp.dataType) // Only timestamp type + ) + val functionId = context.registerFunction(funcName) + val childNodes = children.map(_.doTransform(context)).asJava + val typeNode = ConverterUtils.getTypeNode(dataType, nullable) + ExpressionBuilder.makeScalarFunction(functionId, childNodes, typeNode) + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index b51ecc675780..26a6628bab69 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -287,22 +287,12 @@ object ExpressionConverter extends SQLConfHelper with Logging { replaceWithExpressionTransformer0(t.timeExp, attributeSeq, expressionsMap) t.timeExp.dataType match { case _: TimestampType => - // For timestamp input, use unix_seconds with custom signature - new ExpressionTransformer { - override def substraitExprName: String = "unix_seconds" - override def children: Seq[ExpressionTransformer] = Seq(timeExpTransformer) - override def original: Expression = t - override def doTransform(context: SubstraitContext): ExpressionNode = { - val funcName: String = ConverterUtils.makeFuncName( - substraitExprName, - Seq(t.timeExp.dataType) - ) - val functionId = context.registerFunction(funcName) - val childNodes = children.map(_.doTransform(context)).asJava - val typeNode = ConverterUtils.getTypeNode(dataType, nullable) - ExpressionBuilder.makeScalarFunction(functionId, childNodes, typeNode) - } - } + // For timestamp input, use custom transformer to generate correct signature + ToUnixTimestampTransformer( + substraitExprName, + timeExpTransformer, + t + ) case _ => // For other inputs (date, string), use original logic val children = Seq( From 5aaec829225ed6e465c78d9ddafff425c68dfed0 Mon Sep 17 00:00:00 2001 From: Nimesh Khandelwal Date: Thu, 28 Aug 2025 21:06:53 +0530 Subject: [PATCH 06/15] fix format --- .../execution/VeloxRDDScanTransformer.scala | 64 ------------------- .../DateTimeExpressionsTransformer.scala | 1 - .../expression/ExpressionConverter.scala | 3 - 3 files changed, 68 deletions(-) delete mode 100644 backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxRDDScanTransformer.scala diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxRDDScanTransformer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxRDDScanTransformer.scala deleted file mode 100644 index fefe1424a06a..000000000000 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxRDDScanTransformer.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution - -import org.apache.gluten.execution.ValidationResult -import org.apache.gluten.vectorized.VeloxColumnarBatch - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} -import org.apache.spark.sql.vectorized.ColumnarBatch - -case class VeloxRDDScanTransformer( - outputAttributes: Seq[Attribute], - rdd: RDD[InternalRow], - name: String, - override val outputPartitioning: Partitioning = UnknownPartitioning(0), - override val outputOrdering: Seq[SortOrder] -) extends RDDScanTransformer(outputAttributes, outputPartitioning, outputOrdering) { - - override protected def doValidateInternal(): ValidationResult = { - ValidationResult.succeeded - } - - override def doExecuteColumnar(): RDD[ColumnarBatch] = { - rdd.mapPartitions { it => - if (it.hasNext) { - val rows = it.toArray - val batch = VeloxColumnarBatch.from(schema, rows) - Iterator.single(batch) - } else { - Iterator.empty - } - } - } - - override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = - copy(outputAttributes, rdd, name, outputPartitioning, outputOrdering) -} - -object VeloxRDDScanTransformer { - def replace(rdd: RDDScanExec): RDDScanTransformer = - VeloxRDDScanTransformer( - rdd.output, - rdd.inputRDD, - rdd.nodeName, - rdd.outputPartitioning, - rdd.outputOrdering) -} \ No newline at end of file diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala index abd630ee3000..be3fdb6a8329 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala @@ -21,7 +21,6 @@ import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.TimestampType import scala.collection.JavaConverters._ diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 26a6628bab69..70dd7b3a0dd8 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -19,8 +19,6 @@ package org.apache.gluten.expression import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.gluten.substrait.SubstraitContext -import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.gluten.test.TestStats import org.apache.gluten.utils.DecimalArithmeticUtil @@ -35,7 +33,6 @@ import org.apache.spark.sql.hive.HiveUDFTransformer import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer trait Transformable { From 96e4026ae911d5ec961a0449025e83b056836904 Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Thu, 28 Aug 2025 16:25:10 +0000 Subject: [PATCH 07/15] adding transformer --- .../expression/ExpressionConverter.scala | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 70dd7b3a0dd8..b6f5536953c4 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -303,14 +303,28 @@ object ExpressionConverter extends SQLConfHelper with Logging { ) } case u: UnixTimestamp => - GenericExpressionTransformer( - substraitExprName, - Seq( - replaceWithExpressionTransformer0(u.timeExp, attributeSeq, expressionsMap), - replaceWithExpressionTransformer0(u.format, attributeSeq, expressionsMap) - ), - ToUnixTimestamp(u.timeExp, u.format, u.timeZoneId, u.failOnError) - ) + val toUnixTimestamp = ToUnixTimestamp(u.timeExp, u.format, u.timeZoneId, u.failOnError) + val timeExpTransformer = replaceWithExpressionTransformer0(u.timeExp, attributeSeq, expressionsMap) + + u.timeExp.dataType match { + case _: TimestampType => + // For timestamp input, use custom transformer to generate correct signature + ToUnixTimestampTransformer( + substraitExprName, + timeExpTransformer, + toUnixTimestamp + ) + case _ => + // For other inputs (date, string), use original logic + GenericExpressionTransformer( + substraitExprName, + Seq( + timeExpTransformer, + replaceWithExpressionTransformer0(u.format, attributeSeq, expressionsMap) + ), + toUnixTimestamp + ) + } case t: TruncTimestamp => BackendsApiManager.getSparkPlanExecApiInstance.genTruncTimestampTransformer( substraitExprName, From fe09ceed6ff1254d3981dcac35714c187db2c4fa Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Thu, 28 Aug 2025 16:38:35 +0000 Subject: [PATCH 08/15] adding transformer --- .../org/apache/gluten/expression/ExpressionConverter.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index b6f5536953c4..5911f3f14809 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -304,8 +304,9 @@ object ExpressionConverter extends SQLConfHelper with Logging { } case u: UnixTimestamp => val toUnixTimestamp = ToUnixTimestamp(u.timeExp, u.format, u.timeZoneId, u.failOnError) - val timeExpTransformer = replaceWithExpressionTransformer0(u.timeExp, attributeSeq, expressionsMap) - + val timeExpTransformer = replaceWithExpressionTransformer0( + u.timeExp, attributeSeq, expressionsMap) + u.timeExp.dataType match { case _: TimestampType => // For timestamp input, use custom transformer to generate correct signature From d7ef6f1e0a08f796ec90869597cac20fa15c73bf Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Thu, 28 Aug 2025 16:52:12 +0000 Subject: [PATCH 09/15] formatting --- .../org/apache/gluten/expression/ExpressionConverter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 5911f3f14809..f4847346f2ec 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -304,8 +304,8 @@ object ExpressionConverter extends SQLConfHelper with Logging { } case u: UnixTimestamp => val toUnixTimestamp = ToUnixTimestamp(u.timeExp, u.format, u.timeZoneId, u.failOnError) - val timeExpTransformer = replaceWithExpressionTransformer0( - u.timeExp, attributeSeq, expressionsMap) + val timeExpTransformer = + replaceWithExpressionTransformer0(u.timeExp, attributeSeq, expressionsMap) u.timeExp.dataType match { case _: TimestampType => From 2344fc3574f8655da0c92732899a1690ef0403f0 Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Thu, 28 Aug 2025 19:05:11 +0000 Subject: [PATCH 10/15] refactor --- .../expression/ExpressionConverter.scala | 40 +++++-------------- 1 file changed, 11 insertions(+), 29 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index f4847346f2ec..1a9df9375334 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -276,38 +276,20 @@ object ExpressionConverter extends SQLConfHelper with Logging { substraitExprName, replaceWithExpressionTransformer0(r.child, attributeSeq, expressionsMap), r) - case t: ToUnixTimestamp => + case expr @ (_: ToUnixTimestamp | _: UnixTimestamp) => + // Extract common fields - both ToUnixTimestamp and UnixTimestamp have the same fields + val (timeExp, format, timeZoneId, failOnError) = expr match { + case t: ToUnixTimestamp => (t.timeExp, t.format, t.timeZoneId, t.failOnError) + case u: UnixTimestamp => (u.timeExp, u.format, u.timeZoneId, u.failOnError) + } // The failOnError depends on the config for ANSI. ANSI is not supported currently. // And timeZoneId is passed to backend config. - // For timestamp and date inputs, the format parameter is ignored as per Spark behavior. - val timeExpTransformer = - replaceWithExpressionTransformer0(t.timeExp, attributeSeq, expressionsMap) - t.timeExp.dataType match { - case _: TimestampType => - // For timestamp input, use custom transformer to generate correct signature - ToUnixTimestampTransformer( - substraitExprName, - timeExpTransformer, - t - ) - case _ => - // For other inputs (date, string), use original logic - val children = Seq( - timeExpTransformer, - replaceWithExpressionTransformer0(t.format, attributeSeq, expressionsMap) - ) - GenericExpressionTransformer( - substraitExprName, - children, - t - ) - } - case u: UnixTimestamp => - val toUnixTimestamp = ToUnixTimestamp(u.timeExp, u.format, u.timeZoneId, u.failOnError) + val toUnixTimestamp = ToUnixTimestamp(timeExp, format, timeZoneId, failOnError) val timeExpTransformer = - replaceWithExpressionTransformer0(u.timeExp, attributeSeq, expressionsMap) + replaceWithExpressionTransformer0(timeExp, attributeSeq, expressionsMap) - u.timeExp.dataType match { + // For timestamp and date inputs, the format parameter is ignored as per Spark behavior. + timeExp.dataType match { case _: TimestampType => // For timestamp input, use custom transformer to generate correct signature ToUnixTimestampTransformer( @@ -321,7 +303,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { substraitExprName, Seq( timeExpTransformer, - replaceWithExpressionTransformer0(u.format, attributeSeq, expressionsMap) + replaceWithExpressionTransformer0(format, attributeSeq, expressionsMap) ), toUnixTimestamp ) From 33035f93ef19e9f0cce408562f92b2e66e0e9e10 Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Mon, 1 Sep 2025 15:37:41 +0000 Subject: [PATCH 11/15] code suggestion --- .../DateTimeExpressionsTransformer.scala | 23 ++++------ .../expression/ExpressionConverter.scala | 44 ++++++------------- 2 files changed, 23 insertions(+), 44 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala index be3fdb6a8329..bccd1254fe0d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala @@ -21,6 +21,7 @@ import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.TimestampType import scala.collection.JavaConverters._ @@ -109,21 +110,15 @@ object DateTimeExpressionsTransformer { case class ToUnixTimestampTransformer( substraitExprName: String, - timeExp: ExpressionTransformer, - original: ToUnixTimestamp) + timeExpTransformer: ExpressionTransformer, + formatTransformer: ExpressionTransformer, + original: Expression) extends ExpressionTransformer { - override def children: Seq[ExpressionTransformer] = Seq(timeExp) - - override def doTransform(context: SubstraitContext): ExpressionNode = { - // Generate signature with only timestamp argument - val funcName: String = ConverterUtils.makeFuncName( - substraitExprName, // "to_unix_timestamp" - Seq(original.timeExp.dataType) // Only timestamp type - ) - val functionId = context.registerFunction(funcName) - val childNodes = children.map(_.doTransform(context)).asJava - val typeNode = ConverterUtils.getTypeNode(dataType, nullable) - ExpressionBuilder.makeScalarFunction(functionId, childNodes, typeNode) + override def children: Seq[ExpressionTransformer] = { + timeExpTransformer.dataType match { + case _: TimestampType => Seq(timeExpTransformer) + case _ => Seq(timeExpTransformer, formatTransformer) + } } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index 1a9df9375334..d41c5470bc68 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -276,38 +276,22 @@ object ExpressionConverter extends SQLConfHelper with Logging { substraitExprName, replaceWithExpressionTransformer0(r.child, attributeSeq, expressionsMap), r) - case expr @ (_: ToUnixTimestamp | _: UnixTimestamp) => - // Extract common fields - both ToUnixTimestamp and UnixTimestamp have the same fields - val (timeExp, format, timeZoneId, failOnError) = expr match { - case t: ToUnixTimestamp => (t.timeExp, t.format, t.timeZoneId, t.failOnError) - case u: UnixTimestamp => (u.timeExp, u.format, u.timeZoneId, u.failOnError) - } + case t: ToUnixTimestamp => // The failOnError depends on the config for ANSI. ANSI is not supported currently. // And timeZoneId is passed to backend config. - val toUnixTimestamp = ToUnixTimestamp(timeExp, format, timeZoneId, failOnError) - val timeExpTransformer = - replaceWithExpressionTransformer0(timeExp, attributeSeq, expressionsMap) - - // For timestamp and date inputs, the format parameter is ignored as per Spark behavior. - timeExp.dataType match { - case _: TimestampType => - // For timestamp input, use custom transformer to generate correct signature - ToUnixTimestampTransformer( - substraitExprName, - timeExpTransformer, - toUnixTimestamp - ) - case _ => - // For other inputs (date, string), use original logic - GenericExpressionTransformer( - substraitExprName, - Seq( - timeExpTransformer, - replaceWithExpressionTransformer0(format, attributeSeq, expressionsMap) - ), - toUnixTimestamp - ) - } + ToUnixTimestampTransformer( + substraitExprName, + replaceWithExpressionTransformer0(t.timeExp, attributeSeq, expressionsMap), + replaceWithExpressionTransformer0(t.format, attributeSeq, expressionsMap), + t + ) + case u: UnixTimestamp => + ToUnixTimestampTransformer( + substraitExprName, + replaceWithExpressionTransformer0(u.timeExp, attributeSeq, expressionsMap), + replaceWithExpressionTransformer0(u.format, attributeSeq, expressionsMap), + u + ) case t: TruncTimestamp => BackendsApiManager.getSparkPlanExecApiInstance.genTruncTimestampTransformer( substraitExprName, From ca91650edf9439dda5485951beec49e1567ea1b3 Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Mon, 1 Sep 2025 15:45:59 +0000 Subject: [PATCH 12/15] code suggestion --- .../gluten/expression/DateTimeExpressionsTransformer.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala index bccd1254fe0d..2d9412dd07a1 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala @@ -17,14 +17,10 @@ package org.apache.gluten.expression import org.apache.gluten.exception.GlutenNotSupportException -import org.apache.gluten.substrait.SubstraitContext -import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.TimestampType -import scala.collection.JavaConverters._ - /** The extract trait for 'GetDateField' from Date */ case class ExtractDateTransformer( substraitExprName: String, From 9f1242723df941fd5843756ab99600ddedd2c6f7 Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Tue, 2 Sep 2025 11:29:24 +0000 Subject: [PATCH 13/15] have seperate transfomer --- .../backendsapi/clickhouse/CHSparkPlanExecApi.scala | 12 ++++++++++++ .../backendsapi/velox/VeloxSparkPlanExecApi.scala | 8 ++++++++ .../apache/gluten/backendsapi/SparkPlanExecApi.scala | 12 ++++++++++++ .../gluten/expression/ExpressionConverter.scala | 6 ++---- 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 2af5c3d71b26..129f2580164c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -636,6 +636,18 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { CHTruncTimestampTransformer(substraitExprName, format, timestamp, timeZoneId, original) } + override def genToUnixTimestampTransformer( + substraitExprName: String, + timeExp: ExpressionTransformer, + format: ExpressionTransformer, + original: Expression): ExpressionTransformer = { + GenericExpressionTransformer( + substraitExprName, + Seq(timeExp, format), + original + ) + } + override def genDateDiffTransformer( substraitExprName: String, endDate: ExpressionTransformer, diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index c3ac63f76767..0c90d3ae2f2e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -1048,4 +1048,12 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { } TimestampDiffTransformer(substraitExprName, extract.get, left, right, original) } + + override def genToUnixTimestampTransformer( + substraitExprName: String, + timeExp: ExpressionTransformer, + format: ExpressionTransformer, + original: Expression): ExpressionTransformer = { + ToUnixTimestampTransformer(substraitExprName, timeExp, format, original) + } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 1625cce062e9..2b435d6f0bf9 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -470,6 +470,18 @@ trait SparkPlanExecApi { TruncTimestampTransformer(substraitExprName, format, timestamp, original) } + def genToUnixTimestampTransformer( + substraitExprName: String, + timeExp: ExpressionTransformer, + format: ExpressionTransformer, + original: Expression): ExpressionTransformer = { + GenericExpressionTransformer( + substraitExprName, + Seq(timeExp, format), + original + ) + } + def genDateDiffTransformer( substraitExprName: String, endDate: ExpressionTransformer, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index d41c5470bc68..4d62ae18048f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -277,16 +277,14 @@ object ExpressionConverter extends SQLConfHelper with Logging { replaceWithExpressionTransformer0(r.child, attributeSeq, expressionsMap), r) case t: ToUnixTimestamp => - // The failOnError depends on the config for ANSI. ANSI is not supported currently. - // And timeZoneId is passed to backend config. - ToUnixTimestampTransformer( + BackendsApiManager.getSparkPlanExecApiInstance.genToUnixTimestampTransformer( substraitExprName, replaceWithExpressionTransformer0(t.timeExp, attributeSeq, expressionsMap), replaceWithExpressionTransformer0(t.format, attributeSeq, expressionsMap), t ) case u: UnixTimestamp => - ToUnixTimestampTransformer( + BackendsApiManager.getSparkPlanExecApiInstance.genToUnixTimestampTransformer( substraitExprName, replaceWithExpressionTransformer0(u.timeExp, attributeSeq, expressionsMap), replaceWithExpressionTransformer0(u.format, attributeSeq, expressionsMap), From 14eff5fc6855313e25ffd3a0f68a1815d5c18b4d Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Tue, 2 Sep 2025 14:01:09 +0000 Subject: [PATCH 14/15] move to velox --- .../expression/ExpressionTransformer.scala | 17 ++++++++++++++++- .../DateTimeExpressionsTransformer.scala | 16 ---------------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/expression/ExpressionTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/expression/ExpressionTransformer.scala index a5e77920e485..7567a35663c8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/expression/ExpressionTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/expression/ExpressionTransformer.scala @@ -23,7 +23,7 @@ import org.apache.gluten.substrait.SubstraitContext import org.apache.gluten.substrait.expression._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{IntegerType, LongType} +import org.apache.spark.sql.types.{IntegerType, LongType, TimestampType} import java.lang.{Integer => JInteger} import java.util.{ArrayList => JArrayList} @@ -109,3 +109,18 @@ case class VeloxHashExpressionTransformer( ExpressionBuilder.makeScalarFunction(functionId, nodes, typeNode) } } + +case class ToUnixTimestampTransformer( + substraitExprName: String, + timeExpTransformer: ExpressionTransformer, + formatTransformer: ExpressionTransformer, + original: Expression) + extends ExpressionTransformer { + + override def children: Seq[ExpressionTransformer] = { + timeExpTransformer.dataType match { + case _: TimestampType => Seq(timeExpTransformer) + case _ => Seq(timeExpTransformer, formatTransformer) + } + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala index 2d9412dd07a1..2e2611d5abef 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/DateTimeExpressionsTransformer.scala @@ -19,7 +19,6 @@ package org.apache.gluten.expression import org.apache.gluten.exception.GlutenNotSupportException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.TimestampType /** The extract trait for 'GetDateField' from Date */ case class ExtractDateTransformer( @@ -103,18 +102,3 @@ object DateTimeExpressionsTransformer { scala.reflect.classTag[Second].runtimeClass -> "SECOND" ) } - -case class ToUnixTimestampTransformer( - substraitExprName: String, - timeExpTransformer: ExpressionTransformer, - formatTransformer: ExpressionTransformer, - original: Expression) - extends ExpressionTransformer { - - override def children: Seq[ExpressionTransformer] = { - timeExpTransformer.dataType match { - case _: TimestampType => Seq(timeExpTransformer) - case _ => Seq(timeExpTransformer, formatTransformer) - } - } -} From 3287efef6ebfc2af48ba633286a813af314f342b Mon Sep 17 00:00:00 2001 From: "nimesh.k" Date: Wed, 3 Sep 2025 09:38:00 +0000 Subject: [PATCH 15/15] abstract --- .../org/apache/gluten/backendsapi/SparkPlanExecApi.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 2b435d6f0bf9..58ad9d3e4bc9 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -474,13 +474,7 @@ trait SparkPlanExecApi { substraitExprName: String, timeExp: ExpressionTransformer, format: ExpressionTransformer, - original: Expression): ExpressionTransformer = { - GenericExpressionTransformer( - substraitExprName, - Seq(timeExp, format), - original - ) - } + original: Expression): ExpressionTransformer def genDateDiffTransformer( substraitExprName: String,