From 73c91562abc7f98e00d5e1e77d16c1e3a798a37d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 14 Jan 2026 14:30:07 -0700 Subject: [PATCH] feat: add support for last_day expression Adds native Comet support for Spark's last_day function, which returns the last day of the month for a given date. Uses the SparkLastDay implementation from datafusion-spark crate. Closes #3090 Co-Authored-By: Claude Opus 4.5 --- docs/source/user-guide/latest/configs.md | 1 + native/core/src/execution/jni_api.rs | 2 ++ .../apache/comet/serde/QueryPlanSerde.scala | 1 + .../org/apache/comet/serde/datetime.scala | 4 ++- .../comet/CometTemporalExpressionSuite.scala | 26 +++++++++++++++++++ 5 files changed, 33 insertions(+), 1 deletion(-) diff --git a/docs/source/user-guide/latest/configs.md b/docs/source/user-guide/latest/configs.md index 1a273ad033..5a2450444d 100644 --- a/docs/source/user-guide/latest/configs.md +++ b/docs/source/user-guide/latest/configs.md @@ -266,6 +266,7 @@ These settings can be used to determine which parts of the plan are accelerated | `spark.comet.expression.IsNull.enabled` | Enable Comet acceleration for `IsNull` | true | | `spark.comet.expression.JsonToStructs.enabled` | Enable Comet acceleration for `JsonToStructs` | true | | `spark.comet.expression.KnownFloatingPointNormalized.enabled` | Enable Comet acceleration for `KnownFloatingPointNormalized` | true | +| `spark.comet.expression.LastDay.enabled` | Enable Comet acceleration for `LastDay` | true | | `spark.comet.expression.Length.enabled` | Enable Comet acceleration for `Length` | true | | `spark.comet.expression.LessThan.enabled` | Enable Comet acceleration for `LessThan` | true | | `spark.comet.expression.LessThanOrEqual.enabled` | Enable Comet acceleration for `LessThanOrEqual` | true | diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 75c53198b8..56569bc69c 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -44,6 +44,7 @@ use datafusion_spark::function::bitwise::bit_get::SparkBitGet; use datafusion_spark::function::bitwise::bitwise_not::SparkBitwiseNot; use datafusion_spark::function::datetime::date_add::SparkDateAdd; use datafusion_spark::function::datetime::date_sub::SparkDateSub; +use datafusion_spark::function::datetime::last_day::SparkLastDay; use datafusion_spark::function::hash::sha1::SparkSha1; use datafusion_spark::function::hash::sha2::SparkSha2; use datafusion_spark::function::math::expm1::SparkExpm1; @@ -345,6 +346,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) { session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitGet::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateAdd::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default())); + session_ctx.register_udf(ScalarUDF::new_from_impl(SparkLastDay::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default())); session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default())); diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e50b1d80e6..f68d79d9cc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -187,6 +187,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[DateAdd] -> CometDateAdd, classOf[DateSub] -> CometDateSub, classOf[FromUnixTime] -> CometFromUnixTime, + classOf[LastDay] -> CometLastDay, classOf[Hour] -> CometHour, classOf[Minute] -> CometMinute, classOf[Second] -> CometSecond, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index ef2b0f793c..628e190908 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, Minute, Month, Quarter, Second, TruncDate, TruncTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType} import org.apache.spark.unsafe.types.UTF8String @@ -258,6 +258,8 @@ object CometDateAdd extends CometScalarFunction[DateAdd]("date_add") object CometDateSub extends CometScalarFunction[DateSub]("date_sub") +object CometLastDay extends CometScalarFunction[LastDay]("last_day") + object CometTruncDate extends CometExpressionSerde[TruncDate] { val supportedFormats: Seq[String] = diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 9a23c76d82..53ffd483d7 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -122,4 +122,30 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH StructField("fmt", DataTypes.StringType, true))) FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) } + + test("last_day") { + val r = new Random(42) + val schema = StructType(Seq(StructField("c0", DataTypes.DateType, true))) + val df = FuzzDataGenerator.generateDataFrame(r, spark, schema, 1000, DataGenOptions()) + df.createOrReplaceTempView("tbl") + + // Basic test with random dates + checkSparkAnswerAndOperator("SELECT c0, last_day(c0) FROM tbl ORDER BY c0") + + // Disable constant folding to ensure literal expressions are executed by Comet + withSQLConf( + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> + "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") { + // Test with literal dates - various months + checkSparkAnswerAndOperator( + "SELECT last_day(DATE('2024-01-15')), last_day(DATE('2024-02-15')), last_day(DATE('2024-12-01'))") + + // Test leap year handling (February) + checkSparkAnswerAndOperator( + "SELECT last_day(DATE('2024-02-01')), last_day(DATE('2023-02-01'))") + + // Test null handling + checkSparkAnswerAndOperator("SELECT last_day(NULL)") + } + } }