Skip to content

Allow Spark partial / Comet final for compatible aggregates#2994

Closed
Shekharrajak wants to merge 14 commits intoapache:mainfrom
Shekharrajak:fix/issue-2894-aggregate-fallback
Closed

Allow Spark partial / Comet final for compatible aggregates#2994
Shekharrajak wants to merge 14 commits intoapache:mainfrom
Shekharrajak:fix/issue-2894-aggregate-fallback

Conversation

@Shekharrajak
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2894

Rationale for this change

Comet currently falls back to Spark for ALL final hash aggregates when there's no Comet partial aggregate in the child plan. This is overly conservative because some aggregates have compatible intermediate buffer formats between Spark and Comet.
For example, MIN, MAX, COUNT, and bitwise aggregates (BIT_AND, BIT_OR, BIT_XOR) have simple intermediate buffers (single value) that are compatible between Spark and Comet. These can safely run with "Spark partial / Comet final" execution.
Other aggregates like SUM, AVG, VARIANCE, etc. have known incompatibilities (e.g., decimal overflow handling differences, complex intermediate buffers) and should continue to fall back when there's no Comet partial aggregate.

What changes are included in this PR?

Added supportsSparkPartialCometFinal method to CometAggregateExpressionSerde trait - Default is false

Added helper function - aggSupportsMixedExecution() in QueryPlanSerde

How are these changes tested?

"CometExecRule should not allow Spark partial and Comet final for unsafe aggregates" - Verifies SUM still falls back to Spark

"CometExecRule should allow Spark partial and Comet final for safe aggregates" - Verifies MIN/MAX/COUNT can use Comet final with Spark partial

@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch from f2e6748 to 51869b1 Compare December 27, 2025 09:42
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Dec 27, 2025

Codecov Report

❌ Patch coverage is 53.84615% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 54.58%. Comparing base (f09f8af) to head (51869b1).
⚠️ Report is 803 commits behind head on main.

Files with missing lines Patch % Lines
...main/scala/org/apache/comet/serde/aggregates.scala 50.00% 3 Missing ⚠️
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 33.33% 1 Missing and 1 partial ⚠️
...n/scala/org/apache/spark/sql/comet/operators.scala 66.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2994      +/-   ##
============================================
- Coverage     56.12%   54.58%   -1.54%     
- Complexity      976     1256     +280     
============================================
  Files           119      167      +48     
  Lines         11743    15505    +3762     
  Branches       2251     2571     +320     
============================================
+ Hits           6591     8464    +1873     
- Misses         4012     5822    +1810     
- Partials       1140     1219      +79     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch from 91e0de7 to 274f38b Compare January 22, 2026 16:19
@andygrove
Copy link
Copy Markdown
Member

Sorry for the late review @Shekharrajak. This LGTM except for the missing end-to-end tests for bitwise aggregates that @parthchandra already stated.

I will go ahead and add those tests and push to this branch if permissions allow, or create a new branch from this one.

@andygrove
Copy link
Copy Markdown
Member

Sorry for the late review @Shekharrajak. This LGTM except for the missing end-to-end tests for bitwise aggregates that @parthchandra already stated.

I will go ahead and add those tests and push to this branch if permissions allow, or create a new branch from this one.

Something is wrong with the git history on this branch so I cannot rebase or upmerge.

@Shekharrajak let me know if you are still interested in working on this. If not, I will create a new PR based on your changes.

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Thanks for checking, Let me work on this.

@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch 2 times, most recently from 863ba03 to 141ba57 Compare March 16, 2026 19:20
Copy link
Copy Markdown
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending CI. Thanks @Shekharrajak

@andygrove
Copy link
Copy Markdown
Member

@Shekharrajak golden files now need updating because more aggregates can run natively 🎉

@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch from 4eaef6b to a8d6c0f Compare March 28, 2026 06:22
@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Regenerated the golden files using

 ./dev/regenerate-golden-files.sh --spark-version 4.0

 ./dev/regenerate-golden-files.sh --spark-version 3.5

@andygrove
Copy link
Copy Markdown
Member

This seems to have exposed a bug:

[info] - group-by.sql *** FAILED *** (6 seconds, 481 milliseconds)
[info]   group-by.sql
[info]   Expected Some("struct<count(1):bigint>"), but got Some("struct<>") Schema did not match for query #82
[info]   SELECT count(*)
[info]   FROM VALUES (ARRAY(MAP(1, 2, 2, 3), MAP(1, 3))), (ARRAY(MAP(2, 3), MAP(1, 3))), (ARRAY(MAP(2, 3, 1, 2), MAP(1, 3))) as t(a)
[info]   GROUP BY a: -- !query
[info]   SELECT count(*)
[info]   FROM VALUES (ARRAY(MAP(1, 2, 2, 3), MAP(1, 3))), (ARRAY(MAP(2, 3), MAP(1, 3))), (ARRAY(MAP(2, 3, 1, 2), MAP(1, 3))) as t(a)
[info]   GROUP BY a
[info]   -- !query schema
[info]   struct<>
[info]   -- !query output
[info]   org.apache.comet.CometNativeException
[info]   Not yet implemented: Row format support not yet implemented for: [SortField { options: SortOptions { descending: false, nulls_first: true }, data_type: List(Field { data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Int32 }, Field { name: "value", data_type: Int32 }]) }, false) }) }] (SQLQueryTestSuite.scala:679)

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Looks like this open ticket is connected to the failure #2837 - we have array of map in test case.

Let me try fixing to accept the array of map groupingExpressions

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

The previous check only rejected top-level MapType grouping expressions.
This caused a CometNativeException crash when grouping by ARRAY(MAP(...))
because Comet native cannot handle 'Row format support not yet implemented
for List(Map(...))'.

Fix by recursively checking for MapType within ArrayType and StructType.
This ensures we correctly fall back to Spark for any grouping expression
that contains a MapType at any nesting level.

@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch from ff7bcf9 to b5e0c71 Compare March 30, 2026 07:31
@Shekharrajak
Copy link
Copy Markdown
Contributor Author

locally CometSqlFileTestSuite and AdaptiveQueryExecSuite tests all pass now.

})) {
def containsMapType(dt: DataType): Boolean = dt match {
case _: MapType => true
case a: ArrayType => containsMapType(a.elementType)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some queries have array of maps - recursively checking for MapType within ArrayType and StructType.
This ensures we correctly fall back to Spark for any grouping expression
that contains a MapType at any nesting level.

@andygrove
Copy link
Copy Markdown
Member

Looks like some Spark diffs now need updating:

2026-03-30T10:34:11.5550470Z [info] - SPARK-29894 test Codegen Stage Id in SparkPlanInfo *** FAILED *** (12 milliseconds)
2026-03-30T10:34:11.5552926Z [info]   "[CometNativeColumnarToRow]" did not equal "[WholeStageCodegen (2)]" (SQLAppStatusListenerSuite.scala:706)

@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch from 4eec927 to 34bee9b Compare March 30, 2026 16:17
@Shekharrajak
Copy link
Copy Markdown
Contributor Author

@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch from 34bee9b to ba19751 Compare March 31, 2026 03:28
@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Struggling a bit to make sure all checks are passing in github CI. Somehow I miss one or another tests locally when I run them to verify if everything is fine.

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Below tests assert on Spark-internal AQE optimization behavior (empty relation propagation, partition coalescing) that legitimately doesn't work when Comet's native operators are in the plan. IgnoreCometNativeDataFusion annotations skip these tests only in native_datafusion mode.

SPARK-35442: Support propagate empty relation through aggregate
SPARK-35442: Support propagate empty relation through union
SPARK-34980: Support coalesce partition through union

@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch 5 times, most recently from 8d4b8ae to 3089f35 Compare April 1, 2026 16:51
@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch from 3089f35 to 91684b2 Compare April 1, 2026 17:11
@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch from 91684b2 to 74ddc04 Compare April 1, 2026 17:20
@Shekharrajak
Copy link
Copy Markdown
Contributor Author

CI check failure - debugging :

df1: count(DISTINCT 2), count(DISTINCT 2, 3) | [1, 1] -- PASS
df2: count(DISTINCT 2), count(DISTINCT 3, 2) | [2, 2] -- FAIL

Seems related to #3835

@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Found issue in CI checks : #3881

…egate-fallback

# Conflicts:
#	dev/diffs/3.4.3.diff
#	dev/diffs/4.0.1.diff
#	spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@Shekharrajak Shekharrajak force-pushed the fix/issue-2894-aggregate-fallback branch from 85872c9 to a660456 Compare April 3, 2026 03:58
@Shekharrajak
Copy link
Copy Markdown
Contributor Author

Looks fine, now.

@andygrove
Copy link
Copy Markdown
Member

There is a newer PR #4015 which was partly inspired by this one, so will close this. Thanks again @Shekharrajak for working on this.

@andygrove andygrove closed this Apr 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Comet falls back to Spark for final hash aggregate in some cases when it could be supported

4 participants