From 025d4bc3b33eb08623b2ff6e2e8392aa8032a510 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 16 Sep 2025 08:59:24 -0600 Subject: [PATCH 1/6] refactor fuzz test --- .../comet/CometFuzzAggregateSuite.scala | 90 +++++++++++++ .../org/apache/comet/CometFuzzTestBase.scala | 110 ++++++++++++++++ .../org/apache/comet/CometFuzzTestSuite.scala | 121 +----------------- 3 files changed, 202 insertions(+), 119 deletions(-) create mode 100644 spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala create mode 100644 spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala new file mode 100644 index 0000000000..6c625ae053 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +class CometFuzzAggregateSuite extends CometFuzzTestBase { + + test("count distinct") { + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (col <- df.columns) { + val sql = s"SELECT count(distinct $col) FROM t1" + val (_, cometPlan) = checkSparkAnswer(sql) + if (usingDataSourceExec) { + assert(1 == collectNativeScans(cometPlan).length) + } + } + } + + test("count(*) group by single column") { + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (col <- df.columns) { + // cannot run fully natively due to range partitioning and sort + val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col" + val (_, cometPlan) = checkSparkAnswer(sql) + if (usingDataSourceExec) { + assert(1 == collectNativeScans(cometPlan).length) + } + } + } + + test("count(col) group by single column") { + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + val groupCol = df.columns.head + for (col <- df.columns.drop(1)) { + // cannot run fully natively due to range partitioning and sort + val sql = s"SELECT $groupCol, count($col) FROM t1 GROUP BY $groupCol ORDER BY $groupCol" + val (_, cometPlan) = checkSparkAnswer(sql) + if (usingDataSourceExec) { + assert(1 == collectNativeScans(cometPlan).length) + } + } + } + + test("count(col1, col2, ..) group by single column") { + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + val groupCol = df.columns.head + val otherCol = df.columns.drop(1) + // cannot run fully natively due to range partitioning and sort + val sql = s"SELECT $groupCol, count(${otherCol.mkString(", ")}) FROM t1 " + + s"GROUP BY $groupCol ORDER BY $groupCol" + val (_, cometPlan) = checkSparkAnswer(sql) + if (usingDataSourceExec) { + assert(1 == collectNativeScans(cometPlan).length) + } + } + + test("min/max aggregate") { + val df = spark.read.parquet(filename) + df.createOrReplaceTempView("t1") + for (col <- df.columns) { + // cannot run fully native due to HashAggregate + val sql = s"SELECT min($col), max($col) FROM t1" + val (_, cometPlan) = checkSparkAnswer(sql) + if (usingDataSourceExec) { + assert(1 == collectNativeScans(cometPlan).length) + } + } + } + +} diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala new file mode 100644 index 0000000000..a69080e446 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestBase.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import java.io.File +import java.text.SimpleDateFormat + +import scala.util.Random + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.CometTestBase +import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} +import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} + +class CometFuzzTestBase extends CometTestBase with AdaptiveSparkPlanHelper { + + var filename: String = null + + /** + * We use Asia/Kathmandu because it has a non-zero number of minutes as the offset, so is an + * interesting edge case. Also, this timezone tends to be different from the default system + * timezone. + * + * Represents UTC+5:45 + */ + val defaultTimezone = "Asia/Kathmandu" + + override def beforeAll(): Unit = { + super.beforeAll() + val tempDir = System.getProperty("java.io.tmpdir") + filename = s"$tempDir/CometFuzzTestSuite_${System.currentTimeMillis()}.parquet" + val random = new Random(42) + withSQLConf( + CometConf.COMET_ENABLED.key -> "false", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) { + val options = + DataGenOptions( + generateArray = true, + generateStruct = true, + generateNegativeZero = false, + // override base date due to known issues with experimental scans + baseDate = + new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime) + ParquetGenerator.makeParquetFile(random, spark, filename, 1000, options) + } + } + + protected override def afterAll(): Unit = { + super.afterAll() + FileUtils.deleteDirectory(new File(filename)) + } + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit + pos: Position): Unit = { + Seq("native", "jvm").foreach { shuffleMode => + Seq( + CometConf.SCAN_NATIVE_COMET, + CometConf.SCAN_NATIVE_DATAFUSION, + CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl => + super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) { + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, + CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { + testFun + } + } + } + } + } + + def collectNativeScans(plan: SparkPlan): Seq[SparkPlan] = { + collect(plan) { + case scan: CometScanExec => scan + case scan: CometNativeScanExec => scan + } + } + + def collectCometShuffleExchanges(plan: SparkPlan): Seq[SparkPlan] = { + collect(plan) { case exchange: CometShuffleExchangeExec => + exchange + } + } + +} diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index ed250e141c..1b65dd8d44 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -19,21 +19,10 @@ package org.apache.comet -import java.io.File -import java.text.SimpleDateFormat - import scala.util.Random -import org.scalactic.source.Position -import org.scalatest.Tag - import org.apache.commons.codec.binary.Hex -import org.apache.commons.io.FileUtils -import org.apache.spark.sql.CometTestBase -import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec} -import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types._ @@ -41,43 +30,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.testing.{DataGenOptions, ParquetGenerator} -class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { - - private var filename: String = null - - /** - * We use Asia/Kathmandu because it has a non-zero number of minutes as the offset, so is an - * interesting edge case. Also, this timezone tends to be different from the default system - * timezone. - * - * Represents UTC+5:45 - */ - private val defaultTimezone = "Asia/Kathmandu" - - override def beforeAll(): Unit = { - super.beforeAll() - val tempDir = System.getProperty("java.io.tmpdir") - filename = s"$tempDir/CometFuzzTestSuite_${System.currentTimeMillis()}.parquet" - val random = new Random(42) - withSQLConf( - CometConf.COMET_ENABLED.key -> "false", - SQLConf.SESSION_LOCAL_TIMEZONE.key -> defaultTimezone) { - val options = - DataGenOptions( - generateArray = true, - generateStruct = true, - generateNegativeZero = false, - // override base date due to known issues with experimental scans - baseDate = - new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime) - ParquetGenerator.makeParquetFile(random, spark, filename, 1000, options) - } - } - - protected override def afterAll(): Unit = { - super.afterAll() - FileUtils.deleteDirectory(new File(filename)) - } +class CometFuzzTestSuite extends CometFuzzTestBase { test("select *") { val df = spark.read.parquet(filename) @@ -168,18 +121,6 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("count distinct") { - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (col <- df.columns) { - val sql = s"SELECT count(distinct $col) FROM t1" - val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { - assert(1 == collectNativeScans(cometPlan).length) - } - } - } - test("order by multiple columns") { val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") @@ -192,32 +133,6 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("aggregate group by single column") { - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (col <- df.columns) { - // cannot run fully natively due to range partitioning and sort - val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col" - val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { - assert(1 == collectNativeScans(cometPlan).length) - } - } - } - - test("min/max aggregate") { - val df = spark.read.parquet(filename) - df.createOrReplaceTempView("t1") - for (col <- df.columns) { - // cannot run fully native due to HashAggregate - val sql = s"SELECT min($col), max($col) FROM t1" - val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { - assert(1 == collectNativeScans(cometPlan).length) - } - } - } - test("distribute by single column (complex types)") { val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") @@ -371,36 +286,4 @@ class CometFuzzTestSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit - pos: Position): Unit = { - Seq("native", "jvm").foreach { shuffleMode => - Seq( - CometConf.SCAN_NATIVE_COMET, - CometConf.SCAN_NATIVE_DATAFUSION, - CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach { scanImpl => - super.test(testName + s" ($scanImpl, $shuffleMode shuffle)", testTags: _*) { - withSQLConf( - CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, - CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true", - CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) { - testFun - } - } - } - } - } - - private def collectNativeScans(plan: SparkPlan): Seq[SparkPlan] = { - collect(plan) { - case scan: CometScanExec => scan - case scan: CometNativeScanExec => scan - } - } - - private def collectCometShuffleExchanges(plan: SparkPlan): Seq[SparkPlan] = { - collect(plan) { case exchange: CometShuffleExchangeExec => - exchange - } - } - } From 6f0a12b1381aafaa5137dd4983445ea901628e58 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 16 Sep 2025 09:08:10 -0600 Subject: [PATCH 2/6] link to issue --- .../test/scala/org/apache/comet/CometFuzzAggregateSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala index 6c625ae053..6466f8fc29 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala @@ -26,6 +26,8 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { df.createOrReplaceTempView("t1") for (col <- df.columns) { val sql = s"SELECT count(distinct $col) FROM t1" + // Comet does not support count distinct yet + // https://github.com/apache/datafusion-comet/issues/2292 val (_, cometPlan) = checkSparkAnswer(sql) if (usingDataSourceExec) { assert(1 == collectNativeScans(cometPlan).length) From b2eec397ea5dfe47d496ee39816c637d98e07c3e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 16 Sep 2025 09:19:58 -0600 Subject: [PATCH 3/6] Use DF count_udaf --- native/core/src/execution/planner.rs | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 6051a459e3..0e832599d9 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -30,6 +30,7 @@ use crate::{ use arrow::compute::CastOptions; use arrow::datatypes::{DataType, Field, Schema, TimeUnit, DECIMAL128_MAX_PRECISION}; use datafusion::functions_aggregate::bit_and_or_xor::{bit_and_udaf, bit_or_udaf, bit_xor_udaf}; +use datafusion::functions_aggregate::count::count_udaf; use datafusion::functions_aggregate::min_max::max_udaf; use datafusion::functions_aggregate::min_max::min_udaf; use datafusion::functions_aggregate::sum::sum_udaf; @@ -1904,35 +1905,13 @@ impl PhysicalPlanner { match spark_expr.expr_struct.as_ref().unwrap() { AggExprStruct::Count(expr) => { assert!(!expr.children.is_empty()); - // Using `count_udaf` from Comet is exceptionally slow for some reason, so - // as a workaround we translate it to `SUM(IF(expr IS NOT NULL, 1, 0))` - // https://github.com/apache/datafusion-comet/issues/744 - let children = expr .children .iter() .map(|child| self.create_expr(child, Arc::clone(&schema))) .collect::, _>>()?; - // create `IS NOT NULL expr` and join them with `AND` if there are multiple - let not_null_expr: Arc = children.iter().skip(1).fold( - Arc::new(IsNotNullExpr::new(Arc::clone(&children[0]))) as Arc, - |acc, child| { - Arc::new(BinaryExpr::new( - acc, - DataFusionOperator::And, - Arc::new(IsNotNullExpr::new(Arc::clone(child))), - )) - }, - ); - - let child = Arc::new(IfExpr::new( - not_null_expr, - Arc::new(Literal::new(ScalarValue::Int64(Some(1)))), - Arc::new(Literal::new(ScalarValue::Int64(Some(0)))), - )); - - AggregateExprBuilder::new(sum_udaf(), vec![child]) + AggregateExprBuilder::new(count_udaf(), children) .schema(schema) .alias("count") .with_ignore_nulls(false) From 714e5952aadf4da4a1dfd2543fbccacc48471fdb Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 16 Sep 2025 10:49:07 -0600 Subject: [PATCH 4/6] add test to CI --- .github/workflows/pr_build_linux.yml | 1 + .github/workflows/pr_build_macos.yml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/pr_build_linux.yml b/.github/workflows/pr_build_linux.yml index c45355978e..c0cbf8bbef 100644 --- a/.github/workflows/pr_build_linux.yml +++ b/.github/workflows/pr_build_linux.yml @@ -102,6 +102,7 @@ jobs: - name: "fuzz" value: | org.apache.comet.CometFuzzTestSuite + org.apache.comet.CometFuzzAggregateSuite org.apache.comet.DataGeneratorSuite - name: "shuffle" value: | diff --git a/.github/workflows/pr_build_macos.yml b/.github/workflows/pr_build_macos.yml index 6c71006e5f..ea09de06f5 100644 --- a/.github/workflows/pr_build_macos.yml +++ b/.github/workflows/pr_build_macos.yml @@ -67,6 +67,7 @@ jobs: - name: "fuzz" value: | org.apache.comet.CometFuzzTestSuite + org.apache.comet.CometFuzzAggregateSuite org.apache.comet.DataGeneratorSuite - name: "shuffle" value: | From ec0c9e2ed538a3db489ffa36847e2b009b901e2a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 16 Sep 2025 15:17:31 -0600 Subject: [PATCH 5/6] update benchmark --- .../CometAggregateBenchmark-jdk17-results.txt | 104 ++++++++++++++++++ .../benchmark/CometAggregateBenchmark.scala | 36 +----- 2 files changed, 109 insertions(+), 31 deletions(-) create mode 100644 spark/benchmarks/CometAggregateBenchmark-jdk17-results.txt diff --git a/spark/benchmarks/CometAggregateBenchmark-jdk17-results.txt b/spark/benchmarks/CometAggregateBenchmark-jdk17-results.txt new file mode 100644 index 0000000000..deb8cc5ac1 --- /dev/null +++ b/spark/benchmarks/CometAggregateBenchmark-jdk17-results.txt @@ -0,0 +1,104 @@ +================================================================================================ +Grouped Aggregate (single group key + single aggregate COUNT) +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 100), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 239 248 15 43.9 22.8 1.0X +SQL Parquet - Comet (COUNT) 191 195 3 54.9 18.2 1.2X + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 1024), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 239 248 15 43.9 22.8 1.0X +SQL Parquet - Comet (COUNT) 198 202 4 52.8 18.9 1.2X + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 2471 2472 1 4.2 235.7 1.0X +SQL Parquet - Comet (COUNT) 1043 1044 2 10.1 99.4 2.4X + + +================================================================================================ +Grouped Aggregate (multiple group keys + single aggregate COUNT) +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: multiple group keys (cardinality 100), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 516 532 25 20.3 49.2 1.0X +SQL Parquet - Comet (COUNT) 263 269 5 39.9 25.0 2.0X + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: multiple group keys (cardinality 1024), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 2811 2865 76 3.7 268.0 1.0X +SQL Parquet - Comet (COUNT) 1228 1236 11 8.5 117.1 2.3X + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: multiple group keys (cardinality 1048576), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 5837 5845 11 1.8 556.7 1.0X +SQL Parquet - Comet (COUNT) 4615 4727 158 2.3 440.2 1.3X + + +================================================================================================ +Grouped Aggregate (single group key + multiple aggregates COUNT) +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 100), multiple aggregates COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 307 320 27 34.2 29.3 1.0X +SQL Parquet - Comet (COUNT) 225 233 6 46.5 21.5 1.4X + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 1024), multiple aggregates COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 321 335 18 32.7 30.6 1.0X +SQL Parquet - Comet (COUNT) 240 253 10 43.7 22.9 1.3X + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 1048576), multiple aggregates COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 2898 2926 40 3.6 276.4 1.0X +SQL Parquet - Comet (COUNT) 1440 1445 8 7.3 137.3 2.0X + + +================================================================================================ +Grouped Aggregate (single group key + single aggregate COUNT on decimal) +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 100), single aggregate COUNT on decimal: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 295 309 25 35.5 28.2 1.0X +SQL Parquet - Comet (COUNT) 176 184 6 59.4 16.8 1.7X + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 1024), single aggregate COUNT on decimal: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 308 311 4 34.0 29.4 1.0X +SQL Parquet - Comet (COUNT) 191 196 4 55.0 18.2 1.6X + +OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic +AMD Ryzen 9 7950X3D 16-Core Processor +Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT on decimal: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- +SQL Parquet - Spark (COUNT) 2473 2492 27 4.2 235.9 1.0X +SQL Parquet - Comet (COUNT) 989 1001 17 10.6 94.3 2.5X + + diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala index 86b59050ec..58052d4d5b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala @@ -64,13 +64,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { spark.sql(query).noop() } - benchmark.addCase(s"SQL Parquet - Comet (Scan) ($aggregateFunction)") { _ => - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - spark.sql(query).noop() - } - } - - benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ => + benchmark.addCase(s"SQL Parquet - Comet ($aggregateFunction)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true") { @@ -111,13 +105,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { spark.sql(query).noop() } - benchmark.addCase(s"SQL Parquet - Comet (Scan) ($aggregateFunction)") { _ => - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - spark.sql(query).noop() - } - } - - benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ => + benchmark.addCase(s"SQL Parquet - Comet ($aggregateFunction)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true") { @@ -153,15 +141,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { spark.sql(query).noop() } - benchmark.addCase(s"SQL Parquet - Comet (Scan) ($aggregateFunction)") { _ => - withSQLConf( - CometConf.COMET_ENABLED.key -> "true", - CometConf.COMET_MEMORY_OVERHEAD.key -> "1G") { - spark.sql(query).noop() - } - } - - benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ => + benchmark.addCase(s"SQL Parquet - Comet ($aggregateFunction)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", @@ -198,13 +178,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { spark.sql(query).noop() } - benchmark.addCase(s"SQL Parquet - Comet (Scan) ($aggregateFunction)") { _ => - withSQLConf(CometConf.COMET_ENABLED.key -> "true") { - spark.sql(query).noop() - } - } - - benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ => + benchmark.addCase(s"SQL Parquet - Comet ($aggregateFunction)") { _ => withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true") { @@ -220,7 +194,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { override def runCometBenchmark(mainArgs: Array[String]): Unit = { val total = 1024 * 1024 * 10 val combinations = List(100, 1024, 1024 * 1024) // number of distinct groups - val aggregateFunctions = List("SUM", "MIN", "MAX", "COUNT") + val aggregateFunctions = List( /*"SUM", "MIN", "MAX",*/ "COUNT") aggregateFunctions.foreach { aggFunc => runBenchmarkWithTable( From a25f0a880850156fb877824c4c8529ee0fe78e6e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 16 Sep 2025 15:18:25 -0600 Subject: [PATCH 6/6] update benchmark --- .gitignore | 1 + .../CometAggregateBenchmark-jdk17-results.txt | 104 ------------------ .../benchmark/CometAggregateBenchmark.scala | 2 +- 3 files changed, 2 insertions(+), 105 deletions(-) delete mode 100644 spark/benchmarks/CometAggregateBenchmark-jdk17-results.txt diff --git a/.gitignore b/.gitignore index 4157bf6f28..94877ced70 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ dev/dist apache-rat-*.jar venv dev/release/comet-rm/workdir +spark/benchmarks diff --git a/spark/benchmarks/CometAggregateBenchmark-jdk17-results.txt b/spark/benchmarks/CometAggregateBenchmark-jdk17-results.txt deleted file mode 100644 index deb8cc5ac1..0000000000 --- a/spark/benchmarks/CometAggregateBenchmark-jdk17-results.txt +++ /dev/null @@ -1,104 +0,0 @@ -================================================================================================ -Grouped Aggregate (single group key + single aggregate COUNT) -================================================================================================ - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: single group key (cardinality 100), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------------------- -SQL Parquet - Spark (COUNT) 239 248 15 43.9 22.8 1.0X -SQL Parquet - Comet (COUNT) 191 195 3 54.9 18.2 1.2X - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: single group key (cardinality 1024), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------------------ -SQL Parquet - Spark (COUNT) 239 248 15 43.9 22.8 1.0X -SQL Parquet - Comet (COUNT) 198 202 4 52.8 18.9 1.2X - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------------------------------------- -SQL Parquet - Spark (COUNT) 2471 2472 1 4.2 235.7 1.0X -SQL Parquet - Comet (COUNT) 1043 1044 2 10.1 99.4 2.4X - - -================================================================================================ -Grouped Aggregate (multiple group keys + single aggregate COUNT) -================================================================================================ - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: multiple group keys (cardinality 100), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------------------------------------- -SQL Parquet - Spark (COUNT) 516 532 25 20.3 49.2 1.0X -SQL Parquet - Comet (COUNT) 263 269 5 39.9 25.0 2.0X - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: multiple group keys (cardinality 1024), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------------------------------------- -SQL Parquet - Spark (COUNT) 2811 2865 76 3.7 268.0 1.0X -SQL Parquet - Comet (COUNT) 1228 1236 11 8.5 117.1 2.3X - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: multiple group keys (cardinality 1048576), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------------------------ -SQL Parquet - Spark (COUNT) 5837 5845 11 1.8 556.7 1.0X -SQL Parquet - Comet (COUNT) 4615 4727 158 2.3 440.2 1.3X - - -================================================================================================ -Grouped Aggregate (single group key + multiple aggregates COUNT) -================================================================================================ - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: single group key (cardinality 100), multiple aggregates COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------------------------------------- -SQL Parquet - Spark (COUNT) 307 320 27 34.2 29.3 1.0X -SQL Parquet - Comet (COUNT) 225 233 6 46.5 21.5 1.4X - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: single group key (cardinality 1024), multiple aggregates COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------------------------------------- -SQL Parquet - Spark (COUNT) 321 335 18 32.7 30.6 1.0X -SQL Parquet - Comet (COUNT) 240 253 10 43.7 22.9 1.3X - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: single group key (cardinality 1048576), multiple aggregates COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------------------------------------ -SQL Parquet - Spark (COUNT) 2898 2926 40 3.6 276.4 1.0X -SQL Parquet - Comet (COUNT) 1440 1445 8 7.3 137.3 2.0X - - -================================================================================================ -Grouped Aggregate (single group key + single aggregate COUNT on decimal) -================================================================================================ - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: single group key (cardinality 100), single aggregate COUNT on decimal: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -SQL Parquet - Spark (COUNT) 295 309 25 35.5 28.2 1.0X -SQL Parquet - Comet (COUNT) 176 184 6 59.4 16.8 1.7X - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: single group key (cardinality 1024), single aggregate COUNT on decimal: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -SQL Parquet - Spark (COUNT) 308 311 4 34.0 29.4 1.0X -SQL Parquet - Comet (COUNT) 191 196 4 55.0 18.2 1.6X - -OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic -AMD Ryzen 9 7950X3D 16-Core Processor -Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT on decimal: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- -SQL Parquet - Spark (COUNT) 2473 2492 27 4.2 235.9 1.0X -SQL Parquet - Comet (COUNT) 989 1001 17 10.6 94.3 2.5X - - diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala index 58052d4d5b..47fbe354f5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometAggregateBenchmark.scala @@ -194,7 +194,7 @@ object CometAggregateBenchmark extends CometBenchmarkBase { override def runCometBenchmark(mainArgs: Array[String]): Unit = { val total = 1024 * 1024 * 10 val combinations = List(100, 1024, 1024 * 1024) // number of distinct groups - val aggregateFunctions = List( /*"SUM", "MIN", "MAX",*/ "COUNT") + val aggregateFunctions = List("SUM", "MIN", "MAX", "COUNT") aggregateFunctions.foreach { aggFunc => runBenchmarkWithTable(