From f0d3312caeb25237540008da5e23b80bdab83e4a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 18 Sep 2024 18:13:43 -0700 Subject: [PATCH 1/5] fix: Normalize NaN and zeros for floating number comparison --- .../apache/comet/serde/QueryPlanSerde.scala | 46 ++++++++++++------- .../apache/comet/CometExpressionSuite.scala | 16 +++++++ 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index dbc3a1d80a..92190ddc56 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1034,8 +1034,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None case EqualTo(left, right) => - val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = exprToProtoInternal(right, inputs) + val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) + val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.Equal.newBuilder() @@ -1053,8 +1053,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case Not(EqualTo(left, right)) => - val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = exprToProtoInternal(right, inputs) + val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) + val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.NotEqual.newBuilder() @@ -1072,8 +1072,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case EqualNullSafe(left, right) => - val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = exprToProtoInternal(right, inputs) + val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) + val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.EqualNullSafe.newBuilder() @@ -1091,8 +1091,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case Not(EqualNullSafe(left, right)) => - val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = exprToProtoInternal(right, inputs) + val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) + val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.NotEqualNullSafe.newBuilder() @@ -1110,8 +1110,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case GreaterThan(left, right) => - val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = exprToProtoInternal(right, inputs) + val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) + val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.GreaterThan.newBuilder() @@ -1129,8 +1129,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case GreaterThanOrEqual(left, right) => - val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = exprToProtoInternal(right, inputs) + val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) + val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.GreaterThanEqual.newBuilder() @@ -1148,8 +1148,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case LessThan(left, right) => - val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = exprToProtoInternal(right, inputs) + val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) + val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.LessThan.newBuilder() @@ -1167,8 +1167,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case LessThanOrEqual(left, right) => - val leftExpr = exprToProtoInternal(left, inputs) - val rightExpr = exprToProtoInternal(right, inputs) + val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) + val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.LessThanEqual.newBuilder() @@ -2621,6 +2621,20 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim If(LessThanOrEqual(expression, zero), Literal.create(null, expression.dataType), expression) } + // Spark will normalize NaN and zero for floating point numbers for several cases. + // See `NormalizeFloatingNumbers` optimization rule in Spark. + // However, one exception is for comparison operators. Spark does not normalize NaN and zero + // because they are handled well in Spark (e.g., `SQLOrderingUtil.compareFloats`). But the + // comparison functions in arrow-rs do not normalize NaN and zero. So we need to normalize NaN + // and zero for comparison operators in Comet. + def normalizeNaNAndZero(expr: Expression): Expression = { + expr.dataType match { + case _: FloatType | _: DoubleType => + KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) + case _ => expr + } + } + /** * Returns true if given datatype is supported as a key in DataFusion sort merge join. */ diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index c02875a68c..a4f244e056 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -41,6 +41,22 @@ import org.apache.comet.CometSparkSessionExtensions.{isSpark33Plus, isSpark34Plu class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ + test("compare true/false to negative zero") { + Seq(false, true).foreach { dictionary => + withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { + val table = "test" + withTable(table) { + sql(s"create table $table(col1 boolean, col2 float) using parquet") + sql(s"insert into $table values(true, -0.0)") + sql(s"insert into $table values(false, -0.0)") + + checkSparkAnswerAndOperator( + s"SELECT col1, negative(col2), cast(col1 as float), col1 = negative(col2) FROM $table") + } + } + } + } + test("coalesce should return correct datatype") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => From 87a0f35baf208e4697ed1fa51a428e3eda80ebf2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 20 Sep 2024 00:08:51 -0700 Subject: [PATCH 2/5] fix --- .../comet/CometSparkSessionExtensions.scala | 48 ++++++++++++++++++- .../apache/comet/serde/QueryPlanSerde.scala | 46 +++++++----------- 2 files changed, 62 insertions(+), 32 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index fc28f53c02..b2bf9b82b0 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -25,8 +25,9 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} -import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} +import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.comet._ @@ -47,6 +48,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExc import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DoubleType, FloatType} import org.apache.comet.CometConf._ import org.apache.comet.CometExplainInfo.getActualPlan @@ -840,6 +842,48 @@ class CometSparkSessionExtensions } } + def normalizePlan(plan: SparkPlan): SparkPlan = { + plan.transformUp { + case p: ProjectExec => + val newProjectList = p.projectList.map(normalize(_).asInstanceOf[NamedExpression]) + ProjectExec(newProjectList, p.child) + case f: FilterExec => + val newCondition = normalize(f.condition) + FilterExec(newCondition, f.child) + } + } + + // Spark will normalize NaN and zero for floating point numbers for several cases. + // See `NormalizeFloatingNumbers` optimization rule in Spark. + // However, one exception is for comparison operators. Spark does not normalize NaN and zero + // because they are handled well in Spark (e.g., `SQLOrderingUtil.compareFloats`). But the + // comparison functions in arrow-rs do not normalize NaN and zero. So we need to normalize NaN + // and zero for comparison operators in Comet. + def normalize(expr: Expression): Expression = { + expr.transformUp { + case EqualTo(left, right) => + EqualTo(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case EqualNullSafe(left, right) => + EqualNullSafe(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case GreaterThan(left, right) => + GreaterThan(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case GreaterThanOrEqual(left, right) => + GreaterThanOrEqual(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case LessThan(left, right) => + LessThan(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + case LessThanOrEqual(left, right) => + LessThanOrEqual(normalizeNaNAndZero(left), normalizeNaNAndZero(right)) + } + } + + def normalizeNaNAndZero(expr: Expression): Expression = { + expr.dataType match { + case _: FloatType | _: DoubleType => + KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) + case _ => expr + } + } + override def apply(plan: SparkPlan): SparkPlan = { // DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is // enabled. @@ -865,7 +909,7 @@ class CometSparkSessionExtensions plan } } else { - var newPlan = transform(plan) + var newPlan = transform(normalizePlan(plan)) // if the plan cannot be run fully natively then explain why (when appropriate // config is enabled) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 92190ddc56..5ece08baee 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1034,8 +1034,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None case EqualTo(left, right) => - val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) - val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) + val leftExpr = exprToProto(left, inputs) + val rightExpr = exprToProto(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.Equal.newBuilder() @@ -1053,8 +1053,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case Not(EqualTo(left, right)) => - val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) - val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) + val leftExpr = exprToProto(left, inputs) + val rightExpr = exprToProto(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.NotEqual.newBuilder() @@ -1072,8 +1072,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case EqualNullSafe(left, right) => - val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) - val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) + val leftExpr = exprToProto(left, inputs) + val rightExpr = exprToProto(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.EqualNullSafe.newBuilder() @@ -1091,8 +1091,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case Not(EqualNullSafe(left, right)) => - val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) - val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) + val leftExpr = exprToProto(left, inputs) + val rightExpr = exprToProto(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.NotEqualNullSafe.newBuilder() @@ -1110,8 +1110,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case GreaterThan(left, right) => - val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) - val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) + val leftExpr = exprToProto(left, inputs) + val rightExpr = exprToProto(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.GreaterThan.newBuilder() @@ -1129,8 +1129,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case GreaterThanOrEqual(left, right) => - val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) - val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) + val leftExpr = exprToProto(left, inputs) + val rightExpr = exprToProto(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.GreaterThanEqual.newBuilder() @@ -1148,8 +1148,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case LessThan(left, right) => - val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) - val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) + val leftExpr = exprToProto(left, inputs) + val rightExpr = exprToProto(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.LessThan.newBuilder() @@ -1167,8 +1167,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case LessThanOrEqual(left, right) => - val leftExpr = exprToProtoInternal(normalizeNaNAndZero(left), inputs) - val rightExpr = exprToProtoInternal(normalizeNaNAndZero(right), inputs) + val leftExpr = exprToProto(left, inputs) + val rightExpr = exprToProto(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.LessThanEqual.newBuilder() @@ -2621,20 +2621,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim If(LessThanOrEqual(expression, zero), Literal.create(null, expression.dataType), expression) } - // Spark will normalize NaN and zero for floating point numbers for several cases. - // See `NormalizeFloatingNumbers` optimization rule in Spark. - // However, one exception is for comparison operators. Spark does not normalize NaN and zero - // because they are handled well in Spark (e.g., `SQLOrderingUtil.compareFloats`). But the - // comparison functions in arrow-rs do not normalize NaN and zero. So we need to normalize NaN - // and zero for comparison operators in Comet. - def normalizeNaNAndZero(expr: Expression): Expression = { - expr.dataType match { - case _: FloatType | _: DoubleType => - KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) - case _ => expr - } - } - /** * Returns true if given datatype is supported as a key in DataFusion sort merge join. */ From 76856dc71e67a2a044025464f23c695973275b20 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 20 Sep 2024 15:22:26 -0700 Subject: [PATCH 3/5] fix --- .../apache/comet/CometSparkSessionExtensions.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index b2bf9b82b0..86c116b268 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -877,10 +877,14 @@ class CometSparkSessionExtensions } def normalizeNaNAndZero(expr: Expression): Expression = { - expr.dataType match { - case _: FloatType | _: DoubleType => - KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) - case _ => expr + expr match { + case _: KnownFloatingPointNormalized => expr + case _ => + expr.dataType match { + case _: FloatType | _: DoubleType => + KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) + case _ => expr + } } } From 14cfb118ada4fb16fda2749c9b6a3c81e840e12d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 20 Sep 2024 23:04:12 -0700 Subject: [PATCH 4/5] update --- .../apache/comet/serde/QueryPlanSerde.scala | 32 +++++++++---------- .../q21/explain.txt | 2 +- .../q34/explain.txt | 2 +- .../q39a/explain.txt | 8 ++--- .../q39b/explain.txt | 8 ++--- .../q73/explain.txt | 2 +- .../q21/explain.txt | 2 +- .../q34/explain.txt | 2 +- .../q39a/explain.txt | 8 ++--- .../q39b/explain.txt | 8 ++--- .../q73/explain.txt | 2 +- .../approved-plans-v1_4/q21/explain.txt | 2 +- .../approved-plans-v1_4/q34/explain.txt | 2 +- .../approved-plans-v1_4/q39a/explain.txt | 8 ++--- .../approved-plans-v1_4/q39b/explain.txt | 8 ++--- .../approved-plans-v1_4/q73/explain.txt | 2 +- .../q34/explain.txt | 2 +- .../q34/explain.txt | 2 +- .../approved-plans-v2_7/q34/explain.txt | 2 +- 19 files changed, 52 insertions(+), 52 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 5ece08baee..dbc3a1d80a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1034,8 +1034,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None case EqualTo(left, right) => - val leftExpr = exprToProto(left, inputs) - val rightExpr = exprToProto(right, inputs) + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.Equal.newBuilder() @@ -1053,8 +1053,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case Not(EqualTo(left, right)) => - val leftExpr = exprToProto(left, inputs) - val rightExpr = exprToProto(right, inputs) + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.NotEqual.newBuilder() @@ -1072,8 +1072,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case EqualNullSafe(left, right) => - val leftExpr = exprToProto(left, inputs) - val rightExpr = exprToProto(right, inputs) + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.EqualNullSafe.newBuilder() @@ -1091,8 +1091,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case Not(EqualNullSafe(left, right)) => - val leftExpr = exprToProto(left, inputs) - val rightExpr = exprToProto(right, inputs) + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.NotEqualNullSafe.newBuilder() @@ -1110,8 +1110,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case GreaterThan(left, right) => - val leftExpr = exprToProto(left, inputs) - val rightExpr = exprToProto(right, inputs) + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.GreaterThan.newBuilder() @@ -1129,8 +1129,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case GreaterThanOrEqual(left, right) => - val leftExpr = exprToProto(left, inputs) - val rightExpr = exprToProto(right, inputs) + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.GreaterThanEqual.newBuilder() @@ -1148,8 +1148,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case LessThan(left, right) => - val leftExpr = exprToProto(left, inputs) - val rightExpr = exprToProto(right, inputs) + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.LessThan.newBuilder() @@ -1167,8 +1167,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } case LessThanOrEqual(left, right) => - val leftExpr = exprToProto(left, inputs) - val rightExpr = exprToProto(right, inputs) + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) if (leftExpr.isDefined && rightExpr.isDefined) { val builder = ExprOuterClass.LessThanEqual.newBuilder() diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q21/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q21/explain.txt index dce72db260..e432933e5e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q21/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q21/explain.txt @@ -129,7 +129,7 @@ Functions [2]: [sum(CASE WHEN (d_date#12 < 2000-03-11) THEN inv_quantity_on_hand (22) CometFilter Input [4]: [w_warehouse_name#7, i_item_id#9, inv_before#15, inv_after#16] -Condition : (CASE WHEN (inv_before#15 > 0) THEN ((cast(inv_after#16 as double) / cast(inv_before#15 as double)) >= 0.666667) END AND CASE WHEN (inv_before#15 > 0) THEN ((cast(inv_after#16 as double) / cast(inv_before#15 as double)) <= 1.5) END) +Condition : (CASE WHEN (inv_before#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(inv_after#16 as double) / cast(inv_before#15 as double)))) >= knownfloatingpointnormalized(normalizenanandzero(0.666667))) END AND CASE WHEN (inv_before#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(inv_after#16 as double) / cast(inv_before#15 as double)))) <= knownfloatingpointnormalized(normalizenanandzero(1.5))) END) (23) CometTakeOrderedAndProject Input [4]: [w_warehouse_name#7, i_item_id#9, inv_before#15, inv_after#16] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q34/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q34/explain.txt index dccad5750c..7cfddd6a78 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q34/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q34/explain.txt @@ -110,7 +110,7 @@ ReadSchema: struct10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN ((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)) > 1.2) END) AND isnotnull(hd_demo_sk#12)) +Condition : ((((isnotnull(hd_vehicle_count#15) AND ((hd_buy_potential#13 = >10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)))) > knownfloatingpointnormalized(normalizenanandzero(1.2))) END) AND isnotnull(hd_demo_sk#12)) (17) CometProject Input [4]: [hd_demo_sk#12, hd_buy_potential#13, hd_dep_count#14, hd_vehicle_count#15] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q39a/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q39a/explain.txt index 58e0e66495..96e213fffa 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q39a/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q39a/explain.txt @@ -152,11 +152,11 @@ Functions [2]: [stddev_samp(cast(inv_quantity_on_hand#3 as double)), avg(inv_qua (22) CometFilter Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#17, mean#18] -Condition : CASE WHEN (mean#18 = 0.0) THEN false ELSE ((stdev#17 / mean#18) > 1.0) END +Condition : CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#17 / mean#18))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END (23) CometProject Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#17, mean#18] -Arguments: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19], [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, CASE WHEN (mean#18 = 0.0) THEN null ELSE (stdev#17 / mean#18) END AS cov#19] +Arguments: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19], [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#17 / mean#18) END AS cov#19] (24) Scan parquet spark_catalog.default.inventory Output [4]: [inv_item_sk#20, inv_warehouse_sk#21, inv_quantity_on_hand#22, inv_date_sk#23] @@ -238,11 +238,11 @@ Functions [2]: [stddev_samp(cast(inv_quantity_on_hand#22 as double)), avg(inv_qu (41) CometFilter Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, stdev#17, mean#18] -Condition : CASE WHEN (mean#18 = 0.0) THEN false ELSE ((stdev#17 / mean#18) > 1.0) END +Condition : CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#17 / mean#18))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END (42) CometProject Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, stdev#17, mean#18] -Arguments: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37], [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#18 AS mean#36, CASE WHEN (mean#18 = 0.0) THEN null ELSE (stdev#17 / mean#18) END AS cov#37] +Arguments: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37], [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#18 AS mean#36, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#17 / mean#18) END AS cov#37] (43) CometBroadcastExchange Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q39b/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q39b/explain.txt index c246e93a5b..90b948ef3c 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q39b/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q39b/explain.txt @@ -152,11 +152,11 @@ Functions [2]: [stddev_samp(cast(inv_quantity_on_hand#3 as double)), avg(inv_qua (22) CometFilter Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#17, mean#18] -Condition : (CASE WHEN (mean#18 = 0.0) THEN false ELSE ((stdev#17 / mean#18) > 1.0) END AND CASE WHEN (mean#18 = 0.0) THEN false ELSE ((stdev#17 / mean#18) > 1.5) END) +Condition : (CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#17 / mean#18))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END AND CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#17 / mean#18))) > knownfloatingpointnormalized(normalizenanandzero(1.5))) END) (23) CometProject Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#17, mean#18] -Arguments: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19], [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, CASE WHEN (mean#18 = 0.0) THEN null ELSE (stdev#17 / mean#18) END AS cov#19] +Arguments: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19], [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#17 / mean#18) END AS cov#19] (24) Scan parquet spark_catalog.default.inventory Output [4]: [inv_item_sk#20, inv_warehouse_sk#21, inv_quantity_on_hand#22, inv_date_sk#23] @@ -238,11 +238,11 @@ Functions [2]: [stddev_samp(cast(inv_quantity_on_hand#22 as double)), avg(inv_qu (41) CometFilter Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, stdev#17, mean#18] -Condition : CASE WHEN (mean#18 = 0.0) THEN false ELSE ((stdev#17 / mean#18) > 1.0) END +Condition : CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#17 / mean#18))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END (42) CometProject Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, stdev#17, mean#18] -Arguments: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37], [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#18 AS mean#36, CASE WHEN (mean#18 = 0.0) THEN null ELSE (stdev#17 / mean#18) END AS cov#37] +Arguments: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37], [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#18 AS mean#36, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#17 / mean#18) END AS cov#37] (43) CometBroadcastExchange Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q73/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q73/explain.txt index 8b4e36c964..7530f5aa55 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q73/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q73/explain.txt @@ -110,7 +110,7 @@ ReadSchema: struct10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN ((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)) > 1.0) END) AND isnotnull(hd_demo_sk#12)) +Condition : ((((isnotnull(hd_vehicle_count#15) AND ((hd_buy_potential#13 = >10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END) AND isnotnull(hd_demo_sk#12)) (17) CometProject Input [4]: [hd_demo_sk#12, hd_buy_potential#13, hd_dep_count#14, hd_vehicle_count#15] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q21/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q21/explain.txt index b2f81dbc1e..de5cc95199 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q21/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q21/explain.txt @@ -136,7 +136,7 @@ Results [4]: [w_warehouse_name#7, i_item_id#9, sum(CASE WHEN (d_date#12 < 2000-0 (23) Filter [codegen id : 2] Input [4]: [w_warehouse_name#7, i_item_id#9, inv_before#19, inv_after#20] -Condition : (CASE WHEN (inv_before#19 > 0) THEN ((cast(inv_after#20 as double) / cast(inv_before#19 as double)) >= 0.666667) END AND CASE WHEN (inv_before#19 > 0) THEN ((cast(inv_after#20 as double) / cast(inv_before#19 as double)) <= 1.5) END) +Condition : (CASE WHEN (inv_before#19 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(inv_after#20 as double) / cast(inv_before#19 as double)))) >= knownfloatingpointnormalized(normalizenanandzero(0.666667))) END AND CASE WHEN (inv_before#19 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(inv_after#20 as double) / cast(inv_before#19 as double)))) <= knownfloatingpointnormalized(normalizenanandzero(1.5))) END) (24) TakeOrderedAndProject Input [4]: [w_warehouse_name#7, i_item_id#9, inv_before#19, inv_after#20] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q34/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q34/explain.txt index dccad5750c..7cfddd6a78 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q34/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q34/explain.txt @@ -110,7 +110,7 @@ ReadSchema: struct10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN ((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)) > 1.2) END) AND isnotnull(hd_demo_sk#12)) +Condition : ((((isnotnull(hd_vehicle_count#15) AND ((hd_buy_potential#13 = >10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)))) > knownfloatingpointnormalized(normalizenanandzero(1.2))) END) AND isnotnull(hd_demo_sk#12)) (17) CometProject Input [4]: [hd_demo_sk#12, hd_buy_potential#13, hd_dep_count#14, hd_vehicle_count#15] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q39a/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q39a/explain.txt index 18064ce97b..bbe6950bda 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q39a/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q39a/explain.txt @@ -162,10 +162,10 @@ Results [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stddev_samp(cast(inv_quan (23) Filter [codegen id : 4] Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#24, mean#25] -Condition : CASE WHEN (mean#25 = 0.0) THEN false ELSE ((stdev#24 / mean#25) > 1.0) END +Condition : CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#25)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#24 / mean#25))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END (24) Project [codegen id : 4] -Output [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#25, CASE WHEN (mean#25 = 0.0) THEN null ELSE (stdev#24 / mean#25) END AS cov#26] +Output [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#25, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#25)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#24 / mean#25) END AS cov#26] Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#24, mean#25] (25) Scan parquet spark_catalog.default.inventory @@ -255,10 +255,10 @@ Results [5]: [w_warehouse_sk#33, i_item_sk#32, d_moy#37, stddev_samp(cast(inv_qu (43) Filter [codegen id : 3] Input [5]: [w_warehouse_sk#33, i_item_sk#32, d_moy#37, stdev#48, mean#49] -Condition : CASE WHEN (mean#49 = 0.0) THEN false ELSE ((stdev#48 / mean#49) > 1.0) END +Condition : CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#49)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#48 / mean#49))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END (44) Project [codegen id : 3] -Output [5]: [w_warehouse_sk#33, i_item_sk#32, d_moy#37, mean#49, CASE WHEN (mean#49 = 0.0) THEN null ELSE (stdev#48 / mean#49) END AS cov#50] +Output [5]: [w_warehouse_sk#33, i_item_sk#32, d_moy#37, mean#49, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#49)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#48 / mean#49) END AS cov#50] Input [5]: [w_warehouse_sk#33, i_item_sk#32, d_moy#37, stdev#48, mean#49] (45) BroadcastExchange diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q39b/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q39b/explain.txt index 279efee7b9..e43c23e7b7 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q39b/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q39b/explain.txt @@ -162,10 +162,10 @@ Results [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stddev_samp(cast(inv_quan (23) Filter [codegen id : 4] Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#24, mean#25] -Condition : (CASE WHEN (mean#25 = 0.0) THEN false ELSE ((stdev#24 / mean#25) > 1.0) END AND CASE WHEN (mean#25 = 0.0) THEN false ELSE ((stdev#24 / mean#25) > 1.5) END) +Condition : (CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#25)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#24 / mean#25))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END AND CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#25)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#24 / mean#25))) > knownfloatingpointnormalized(normalizenanandzero(1.5))) END) (24) Project [codegen id : 4] -Output [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#25, CASE WHEN (mean#25 = 0.0) THEN null ELSE (stdev#24 / mean#25) END AS cov#26] +Output [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#25, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#25)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#24 / mean#25) END AS cov#26] Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#24, mean#25] (25) Scan parquet spark_catalog.default.inventory @@ -255,10 +255,10 @@ Results [5]: [w_warehouse_sk#33, i_item_sk#32, d_moy#37, stddev_samp(cast(inv_qu (43) Filter [codegen id : 3] Input [5]: [w_warehouse_sk#33, i_item_sk#32, d_moy#37, stdev#48, mean#49] -Condition : CASE WHEN (mean#49 = 0.0) THEN false ELSE ((stdev#48 / mean#49) > 1.0) END +Condition : CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#49)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#48 / mean#49))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END (44) Project [codegen id : 3] -Output [5]: [w_warehouse_sk#33, i_item_sk#32, d_moy#37, mean#49, CASE WHEN (mean#49 = 0.0) THEN null ELSE (stdev#48 / mean#49) END AS cov#50] +Output [5]: [w_warehouse_sk#33, i_item_sk#32, d_moy#37, mean#49, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#49)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#48 / mean#49) END AS cov#50] Input [5]: [w_warehouse_sk#33, i_item_sk#32, d_moy#37, stdev#48, mean#49] (45) BroadcastExchange diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q73/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q73/explain.txt index 8b4e36c964..7530f5aa55 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q73/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark4_0/q73/explain.txt @@ -110,7 +110,7 @@ ReadSchema: struct10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN ((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)) > 1.0) END) AND isnotnull(hd_demo_sk#12)) +Condition : ((((isnotnull(hd_vehicle_count#15) AND ((hd_buy_potential#13 = >10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END) AND isnotnull(hd_demo_sk#12)) (17) CometProject Input [4]: [hd_demo_sk#12, hd_buy_potential#13, hd_dep_count#14, hd_vehicle_count#15] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q21/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q21/explain.txt index dce72db260..e432933e5e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q21/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q21/explain.txt @@ -129,7 +129,7 @@ Functions [2]: [sum(CASE WHEN (d_date#12 < 2000-03-11) THEN inv_quantity_on_hand (22) CometFilter Input [4]: [w_warehouse_name#7, i_item_id#9, inv_before#15, inv_after#16] -Condition : (CASE WHEN (inv_before#15 > 0) THEN ((cast(inv_after#16 as double) / cast(inv_before#15 as double)) >= 0.666667) END AND CASE WHEN (inv_before#15 > 0) THEN ((cast(inv_after#16 as double) / cast(inv_before#15 as double)) <= 1.5) END) +Condition : (CASE WHEN (inv_before#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(inv_after#16 as double) / cast(inv_before#15 as double)))) >= knownfloatingpointnormalized(normalizenanandzero(0.666667))) END AND CASE WHEN (inv_before#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(inv_after#16 as double) / cast(inv_before#15 as double)))) <= knownfloatingpointnormalized(normalizenanandzero(1.5))) END) (23) CometTakeOrderedAndProject Input [4]: [w_warehouse_name#7, i_item_id#9, inv_before#15, inv_after#16] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q34/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q34/explain.txt index dccad5750c..7cfddd6a78 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q34/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q34/explain.txt @@ -110,7 +110,7 @@ ReadSchema: struct10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN ((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)) > 1.2) END) AND isnotnull(hd_demo_sk#12)) +Condition : ((((isnotnull(hd_vehicle_count#15) AND ((hd_buy_potential#13 = >10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)))) > knownfloatingpointnormalized(normalizenanandzero(1.2))) END) AND isnotnull(hd_demo_sk#12)) (17) CometProject Input [4]: [hd_demo_sk#12, hd_buy_potential#13, hd_dep_count#14, hd_vehicle_count#15] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q39a/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q39a/explain.txt index 58e0e66495..96e213fffa 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q39a/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q39a/explain.txt @@ -152,11 +152,11 @@ Functions [2]: [stddev_samp(cast(inv_quantity_on_hand#3 as double)), avg(inv_qua (22) CometFilter Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#17, mean#18] -Condition : CASE WHEN (mean#18 = 0.0) THEN false ELSE ((stdev#17 / mean#18) > 1.0) END +Condition : CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#17 / mean#18))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END (23) CometProject Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#17, mean#18] -Arguments: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19], [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, CASE WHEN (mean#18 = 0.0) THEN null ELSE (stdev#17 / mean#18) END AS cov#19] +Arguments: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19], [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#17 / mean#18) END AS cov#19] (24) Scan parquet spark_catalog.default.inventory Output [4]: [inv_item_sk#20, inv_warehouse_sk#21, inv_quantity_on_hand#22, inv_date_sk#23] @@ -238,11 +238,11 @@ Functions [2]: [stddev_samp(cast(inv_quantity_on_hand#22 as double)), avg(inv_qu (41) CometFilter Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, stdev#17, mean#18] -Condition : CASE WHEN (mean#18 = 0.0) THEN false ELSE ((stdev#17 / mean#18) > 1.0) END +Condition : CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#17 / mean#18))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END (42) CometProject Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, stdev#17, mean#18] -Arguments: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37], [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#18 AS mean#36, CASE WHEN (mean#18 = 0.0) THEN null ELSE (stdev#17 / mean#18) END AS cov#37] +Arguments: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37], [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#18 AS mean#36, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#17 / mean#18) END AS cov#37] (43) CometBroadcastExchange Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q39b/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q39b/explain.txt index c246e93a5b..90b948ef3c 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q39b/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q39b/explain.txt @@ -152,11 +152,11 @@ Functions [2]: [stddev_samp(cast(inv_quantity_on_hand#3 as double)), avg(inv_qua (22) CometFilter Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#17, mean#18] -Condition : (CASE WHEN (mean#18 = 0.0) THEN false ELSE ((stdev#17 / mean#18) > 1.0) END AND CASE WHEN (mean#18 = 0.0) THEN false ELSE ((stdev#17 / mean#18) > 1.5) END) +Condition : (CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#17 / mean#18))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END AND CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#17 / mean#18))) > knownfloatingpointnormalized(normalizenanandzero(1.5))) END) (23) CometProject Input [5]: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, stdev#17, mean#18] -Arguments: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19], [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, CASE WHEN (mean#18 = 0.0) THEN null ELSE (stdev#17 / mean#18) END AS cov#19] +Arguments: [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, cov#19], [w_warehouse_sk#7, i_item_sk#6, d_moy#11, mean#18, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#17 / mean#18) END AS cov#19] (24) Scan parquet spark_catalog.default.inventory Output [4]: [inv_item_sk#20, inv_warehouse_sk#21, inv_quantity_on_hand#22, inv_date_sk#23] @@ -238,11 +238,11 @@ Functions [2]: [stddev_samp(cast(inv_quantity_on_hand#22 as double)), avg(inv_qu (41) CometFilter Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, stdev#17, mean#18] -Condition : CASE WHEN (mean#18 = 0.0) THEN false ELSE ((stdev#17 / mean#18) > 1.0) END +Condition : CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN false ELSE (knownfloatingpointnormalized(normalizenanandzero((stdev#17 / mean#18))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END (42) CometProject Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, stdev#17, mean#18] -Arguments: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37], [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#18 AS mean#36, CASE WHEN (mean#18 = 0.0) THEN null ELSE (stdev#17 / mean#18) END AS cov#37] +Arguments: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37], [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#18 AS mean#36, CASE WHEN (knownfloatingpointnormalized(normalizenanandzero(mean#18)) = knownfloatingpointnormalized(normalizenanandzero(0.0))) THEN null ELSE (stdev#17 / mean#18) END AS cov#37] (43) CometBroadcastExchange Input [5]: [w_warehouse_sk#26, i_item_sk#25, d_moy#30, mean#36, cov#37] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q73/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q73/explain.txt index 8b4e36c964..7530f5aa55 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q73/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q73/explain.txt @@ -110,7 +110,7 @@ ReadSchema: struct10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN ((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)) > 1.0) END) AND isnotnull(hd_demo_sk#12)) +Condition : ((((isnotnull(hd_vehicle_count#15) AND ((hd_buy_potential#13 = >10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)))) > knownfloatingpointnormalized(normalizenanandzero(1.0))) END) AND isnotnull(hd_demo_sk#12)) (17) CometProject Input [4]: [hd_demo_sk#12, hd_buy_potential#13, hd_dep_count#14, hd_vehicle_count#15] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q34/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q34/explain.txt index a8800a70ff..dd5097203e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q34/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q34/explain.txt @@ -110,7 +110,7 @@ ReadSchema: struct10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN ((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)) > 1.2) END) AND isnotnull(hd_demo_sk#12)) +Condition : ((((isnotnull(hd_vehicle_count#15) AND ((hd_buy_potential#13 = >10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)))) > knownfloatingpointnormalized(normalizenanandzero(1.2))) END) AND isnotnull(hd_demo_sk#12)) (17) CometProject Input [4]: [hd_demo_sk#12, hd_buy_potential#13, hd_dep_count#14, hd_vehicle_count#15] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q34/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q34/explain.txt index a8800a70ff..dd5097203e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q34/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark4_0/q34/explain.txt @@ -110,7 +110,7 @@ ReadSchema: struct10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN ((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)) > 1.2) END) AND isnotnull(hd_demo_sk#12)) +Condition : ((((isnotnull(hd_vehicle_count#15) AND ((hd_buy_potential#13 = >10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)))) > knownfloatingpointnormalized(normalizenanandzero(1.2))) END) AND isnotnull(hd_demo_sk#12)) (17) CometProject Input [4]: [hd_demo_sk#12, hd_buy_potential#13, hd_dep_count#14, hd_vehicle_count#15] diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q34/explain.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q34/explain.txt index a8800a70ff..dd5097203e 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q34/explain.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q34/explain.txt @@ -110,7 +110,7 @@ ReadSchema: struct10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN ((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)) > 1.2) END) AND isnotnull(hd_demo_sk#12)) +Condition : ((((isnotnull(hd_vehicle_count#15) AND ((hd_buy_potential#13 = >10000 ) OR (hd_buy_potential#13 = unknown ))) AND (hd_vehicle_count#15 > 0)) AND CASE WHEN (hd_vehicle_count#15 > 0) THEN (knownfloatingpointnormalized(normalizenanandzero((cast(hd_dep_count#14 as double) / cast(hd_vehicle_count#15 as double)))) > knownfloatingpointnormalized(normalizenanandzero(1.2))) END) AND isnotnull(hd_demo_sk#12)) (17) CometProject Input [4]: [hd_demo_sk#12, hd_buy_potential#13, hd_dep_count#14, hd_vehicle_count#15] From 53e2443be72a5ff4e4422a03a0044f484f8a563b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 22 Sep 2024 12:01:44 -0700 Subject: [PATCH 5/5] Add doc --- docs/source/user-guide/compatibility.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 5e8499928b..031322e9f3 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -38,6 +38,14 @@ Comet uses the Rust regexp crate for evaluating regular expressions, and this ha regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`. +## Floating number comparison + +Spark normalizes NaN and zero for floating point numbers for several cases. See `NormalizeFloatingNumbers` optimization rule in Spark. +However, one exception is comparison. Spark does not normalize NaN and zero when comparing values +because they are handled well in Spark (e.g., `SQLOrderingUtil.compareFloats`). But the comparison +functions of arrow-rs used by DataFusion do not normalize NaN and zero (e.g., [arrow::compute::kernels::cmp::eq](https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.eq.html#)). +So Comet will add additional normalization expression of NaN and zero for comparison. + ## Cast Cast operations in Comet fall into three levels of support: