From 1f68bdf58c39dff8366a43d1aa012c05be5a0ee3 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Wed, 6 Aug 2025 13:11:38 +0200 Subject: [PATCH 1/3] test --- .../apache/comet/serde/QueryPlanSerde.scala | 4 +- .../apache/comet/shims/CometExprShim.scala | 6 +++ .../apache/comet/shims/CometExprShim.scala | 6 +++ .../apache/comet/shims/CometExprShim.scala | 44 +++++++++++++++++++ .../org/apache/comet/CometFuzzTestSuite.scala | 2 - 5 files changed, 58 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index a184bc94ce..0c5616eb14 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -630,7 +630,7 @@ object QueryPlanSerde extends Logging with CometExprShim { } } - expr match { + versionSpecificExprToProtoInternal(expr, inputs, binding).orElse(expr match { case a @ Alias(_, _) => val r = exprToProtoInternal(a.child, inputs, binding) if (r.isEmpty) { @@ -1679,7 +1679,7 @@ object QueryPlanSerde extends Logging with CometExprShim { withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*) None } - } + }) } /** diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 5f4e3fba2b..60475d5fa2 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -19,6 +19,7 @@ package org.apache.comet.shims import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.spark.sql.catalyst.expressions._ /** @@ -34,6 +35,11 @@ trait CometExprShim { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) + + def versionSpecificExprToProtoInternal( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = None } object CometEvalModeUtil { diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 5f4e3fba2b..60475d5fa2 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -19,6 +19,7 @@ package org.apache.comet.shims import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.spark.sql.catalyst.expressions._ /** @@ -34,6 +35,11 @@ trait CometExprShim { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) + + def versionSpecificExprToProtoInternal( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = None } object CometEvalModeUtil { diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 5f4e3fba2b..55511e5bc6 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -18,8 +18,16 @@ */ package org.apache.comet.shims +import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode +import org.apache.comet.serde.ExprOuterClass.Expr +import org.apache.comet.serde.QueryPlanSerde.{castToProto, exprToProtoInternal} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke +import org.apache.spark.sql.internal.types.StringTypeWithCollation +import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringType} + +import java.util.Locale /** * `CometExprShim` acts as a shim for for parsing expressions from different Spark versions. @@ -34,6 +42,42 @@ trait CometExprShim { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) + + def versionSpecificExprToProtoInternal( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + expr match { + case s: StaticInvoke + if s.staticObject == classOf[StringDecode] && + s.dataType.isInstanceOf[StringType] && + s.functionName == "decode" && + s.arguments.size == 4 && + s.inputTypes == Seq( + BinaryType, + StringTypeWithCollation(supportsTrimCollation = true), + BooleanType, + BooleanType) => + val Seq(bin, charset, _, _) = s.arguments + charset match { + case Literal(str, DataTypes.StringType) + if str.toString.toLowerCase(Locale.ROOT) == "utf-8" => + // decode(col, 'utf-8') can be treated as a cast with "try" eval mode that puts nulls + // for invalid strings. + // Left child is the binary expression. + castToProto( + expr, + None, + DataTypes.StringType, + exprToProtoInternal(bin, inputs, binding).get, + CometEvalMode.TRY) + case _ => + withInfo(expr, "Comet only supports decoding with 'utf-8'.") + None + } + case _ => None + } + } } object CometEvalModeUtil { diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index d1f55cbe1e..ca87f1b3b7 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types._ -import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -273,7 +272,6 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("decode") { // https://github.com/apache/datafusion-comet/issues/1942 - assume(!isSpark40Plus) val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") // We want to make sure that the schema generator wasn't modified to accidentally omit From b830922a1e07f96823fd660df29cf7be0ef0042f Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 7 Aug 2025 10:59:20 +0200 Subject: [PATCH 2/3] review fixes --- .../apache/comet/serde/QueryPlanSerde.scala | 41 +++++++++++-------- .../apache/comet/shims/CometExprShim.scala | 22 ++-------- .../org/apache/comet/CometFuzzTestSuite.scala | 1 - 3 files changed, 28 insertions(+), 36 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 0c5616eb14..f2ebc83acf 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1286,22 +1286,7 @@ object QueryPlanSerde extends Logging with CometExprShim { case s: StringDecode => // Right child is the encoding expression. - s.charset match { - case Literal(str, DataTypes.StringType) - if str.toString.toLowerCase(Locale.ROOT) == "utf-8" => - // decode(col, 'utf-8') can be treated as a cast with "try" eval mode that puts nulls - // for invalid strings. - // Left child is the binary expression. - castToProto( - expr, - None, - DataTypes.StringType, - exprToProtoInternal(s.bin, inputs, binding).get, - CometEvalMode.TRY) - case _ => - withInfo(expr, "Comet only supports decoding with 'utf-8'.") - None - } + stringDecode(expr, s.charset, s.bin, inputs, binding) case RegExpReplace(subject, pattern, replacement, startPosition) => if (!RegExp.isSupportedPattern(pattern.toString) && @@ -1682,6 +1667,30 @@ object QueryPlanSerde extends Logging with CometExprShim { }) } + def stringDecode( + expr: Expression, + charset: Expression, + bin: Expression, + inputs: Seq[Attribute], + binding: Boolean) = { + charset match { + case Literal(str, DataTypes.StringType) + if str.toString.toLowerCase(Locale.ROOT) == "utf-8" => + // decode(col, 'utf-8') can be treated as a cast with "try" eval mode that puts nulls + // for invalid strings. + // Left child is the binary expression. + castToProto( + expr, + None, + DataTypes.StringType, + exprToProtoInternal(bin, inputs, binding).get, + CometEvalMode.TRY) + case _ => + withInfo(expr, "Comet only supports decoding with 'utf-8'.") + None + } + } + /** * Creates a UnaryExpr by calling exprToProtoInternal for the provided child expression and then * invokes the supplied function to wrap this UnaryExpr in a top-level Expr. diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 55511e5bc6..a020e3160b 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -18,14 +18,13 @@ */ package org.apache.comet.shims -import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode import org.apache.comet.serde.ExprOuterClass.Expr -import org.apache.comet.serde.QueryPlanSerde.{castToProto, exprToProtoInternal} +import org.apache.comet.serde.QueryPlanSerde.stringDecode import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.internal.types.StringTypeWithCollation -import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringType} +import org.apache.spark.sql.types.{BinaryType, BooleanType, StringType} import java.util.Locale @@ -59,22 +58,7 @@ trait CometExprShim { BooleanType, BooleanType) => val Seq(bin, charset, _, _) = s.arguments - charset match { - case Literal(str, DataTypes.StringType) - if str.toString.toLowerCase(Locale.ROOT) == "utf-8" => - // decode(col, 'utf-8') can be treated as a cast with "try" eval mode that puts nulls - // for invalid strings. - // Left child is the binary expression. - castToProto( - expr, - None, - DataTypes.StringType, - exprToProtoInternal(bin, inputs, binding).get, - CometEvalMode.TRY) - case _ => - withInfo(expr, "Comet only supports decoding with 'utf-8'.") - None - } + stringDecode(expr, charset, bin, inputs, binding) case _ => None } } diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index ca87f1b3b7..a1b1812b31 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -271,7 +271,6 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("decode") { - // https://github.com/apache/datafusion-comet/issues/1942 val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") // We want to make sure that the schema generator wasn't modified to accidentally omit From 7f21fb2ad48248500c60d31364c37c5d05e89777 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Thu, 7 Aug 2025 11:13:25 +0200 Subject: [PATCH 3/3] move pre 4.0 `StringDecode` expression encoding to shims, style fixes --- .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 8 ++------ .../org/apache/comet/shims/CometExprShim.scala | 11 ++++++++++- .../org/apache/comet/shims/CometExprShim.scala | 11 ++++++++++- .../org/apache/comet/shims/CometExprShim.scala | 3 +-- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index f2ebc83acf..6aa161280a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1284,10 +1284,6 @@ object QueryPlanSerde extends Logging with CometExprShim { optExprWithInfo(optExpr, expr, r.child) } - case s: StringDecode => - // Right child is the encoding expression. - stringDecode(expr, s.charset, s.bin, inputs, binding) - case RegExpReplace(subject, pattern, replacement, startPosition) => if (!RegExp.isSupportedPattern(pattern.toString) && !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { @@ -1672,10 +1668,10 @@ object QueryPlanSerde extends Logging with CometExprShim { charset: Expression, bin: Expression, inputs: Seq[Attribute], - binding: Boolean) = { + binding: Boolean): Option[Expr] = { charset match { case Literal(str, DataTypes.StringType) - if str.toString.toLowerCase(Locale.ROOT) == "utf-8" => + if str.toString.toLowerCase(Locale.ROOT) == "utf-8" => // decode(col, 'utf-8') can be treated as a cast with "try" eval mode that puts nulls // for invalid strings. // Left child is the binary expression. diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 60475d5fa2..2a302d8d41 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -20,6 +20,7 @@ package org.apache.comet.shims import org.apache.comet.expressions.CometEvalMode import org.apache.comet.serde.ExprOuterClass.Expr +import org.apache.comet.serde.QueryPlanSerde.stringDecode import org.apache.spark.sql.catalyst.expressions._ /** @@ -39,7 +40,15 @@ trait CometExprShim { def versionSpecificExprToProtoInternal( expr: Expression, inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = None + binding: Boolean): Option[Expr] = { + expr match { + case s: StringDecode => + // Right child is the encoding expression. + stringDecode(expr, s.charset, s.bin, inputs, binding) + + case _ => None + } + } } object CometEvalModeUtil { diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 60475d5fa2..2a302d8d41 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -20,6 +20,7 @@ package org.apache.comet.shims import org.apache.comet.expressions.CometEvalMode import org.apache.comet.serde.ExprOuterClass.Expr +import org.apache.comet.serde.QueryPlanSerde.stringDecode import org.apache.spark.sql.catalyst.expressions._ /** @@ -39,7 +40,15 @@ trait CometExprShim { def versionSpecificExprToProtoInternal( expr: Expression, inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = None + binding: Boolean): Option[Expr] = { + expr match { + case s: StringDecode => + // Right child is the encoding expression. + stringDecode(expr, s.charset, s.bin, inputs, binding) + + case _ => None + } + } } object CometEvalModeUtil { diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index a020e3160b..1b8e5aaa04 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -26,8 +26,6 @@ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.internal.types.StringTypeWithCollation import org.apache.spark.sql.types.{BinaryType, BooleanType, StringType} -import java.util.Locale - /** * `CometExprShim` acts as a shim for for parsing expressions from different Spark versions. */ @@ -59,6 +57,7 @@ trait CometExprShim { BooleanType) => val Seq(bin, charset, _, _) = s.arguments stringDecode(expr, charset, bin, inputs, binding) + case _ => None } }