Skip to content

[VL] The result of sum(double) sometimes mismatches with Vanilla Spark #8985

@kecookier

Description

@kecookier

Backend

VL (Velox)

Bug description

Sum of double does not support the Associative Law. In Velox, when the flush of Partial HashAgg is triggered, the order of double values is changed, causing the result to be unstable. While Vanilla Spark does not have a flush, when the input is ordered, the result is stable. Currently, in our production ETL, we have something like select c1, cast(sum(c2) as bigint) from t1 group by c1. The result from Gluten is not equal to that from Vanilla Spark. Similarly, the average of double has the same behavior.

Sum of double does not support the Associative Law, a simple test shows this:

public static void test3() {
    double a = 24.621, b = 12.14, c = 0.169, d = 6.865, e = 1.879, f = 16.326;
    double sum1 = ((((a + f) + b) + d) + e) + c;
    double sum2 = ((((c + e) + d) + b) + f) + a;
    double sum3 = ((a + c) + (f + e)) + (b + d);
    double sum4 = (a + b) + (c + d) + (e + f);

    System.out.printf("Sum1: %.15f, as long: %d%n", sum1, (long)sum1);
    System.out.printf("Sum2: %.15f, as long: %d%n", sum2, (long)sum2);
    System.out.printf("Sum3: %.15f, as long: %d%n", sum3, (long)sum3);
    System.out.printf("Sum4: %.15f, as long: %d%n", sum4, (long)sum4);
  }

/*
Sum1: 62.000000000000000, as long: 62
Sum2: 62.000000000000000, as long: 62
Sum3: 62.000000000000010, as long: 62
Sum4: 61.999999999999990, as long: 61
*/ 

To reproduce the issue with a Gluten test:

test("flushable aggregate rule - disable when double sum") {
    withSQLConf(
      "spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemory" -> "100",
      "spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput" -> "false",
      "spark.gluten.sql.columnar.maxBatchSize" -> "2"
    ) {
      withTempView("t1") {
        import testImplicits._
        Seq((24.621d, 1), (12.14d, 1), (0.169d, 1), (6.865d, 1), (1.879d, 1), (16.326d, 1))
          .toDF("c1", "c2")
          .createOrReplaceTempView("t1")
        runQueryAndCompare("select c2, cast(sum(c1) as bigint) from t1 group by c2") {
          df =>
            {
              assert(
                getExecutedPlan(df).count(
                  plan => {
                    plan.isInstanceOf[RegularHashAggregateExecTransformer]
                  }) == 1)
              assert(
                getExecutedPlan(df).count(
                  plan => {
                    plan.isInstanceOf[FlushableHashAggregateExecTransformer]
                  }) == 1)
            }
        }
      }
    }
  }

Spark version

Spark-3.5.x

Spark configurations

No response

System information

No response

Relevant logs

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingtriage

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions