[SPARK-48628][CORE] Add task peak on/off heap memory metrics#47776
[SPARK-48628][CORE] Add task peak on/off heap memory metrics#47776liuzqt wants to merge 3 commits intoapache:masterfrom
Conversation
### What changes were proposed in this pull request? Add task on/off heap execution memory in `TaskMetrics`, tracked in `TaskMemoryManager`, **assuming `acquireExecutionMemory` is the only one narrow waist for acquiring execution memory.** ### Why are the changes needed? Currently there is no task on/off heap execution memory metrics. There is a [peakExecutionMemory](https://github.com/apache/spark/blob/3cd35f8cb6462051c621cf49de54b9c5692aae1d/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala#L114) metrics, however, the semantic is a confusing: it only cover the execution memory used by shuffle/join/aggregate/sort, which is accumulated in specific operators and thus not really reflect the real execution memory. Therefore it's necessary to add these two metrics. Also I created two followup sub tickets: - https://issues.apache.org/jira/browse/SPARK-48788 : accumulate task metrics in stage, and display in Spark UI - https://issues.apache.org/jira/browse/SPARK-48789 : deprecate `peakExecutionMemory` once we have replacement for it. The ultimate goal is to have these two metrics ready (as accumulated stage metrics in Spark UI as well) and deprecate `peakExecutionMemory`. ### Does this PR introduce _any_ user-facing change? Supposedly no. But two followup sub tickets will have user-facing change: new metrics exposed to Spark UI, and old metrics deprecation. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? NO Closes apache#47192 from liuzqt/SPARK-48628. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
There was a problem hiding this comment.
Thank you, @liuzqt .
Please run ExternalAppendOnlyUnsafeRowArrayBenchmark and attach the result into this PR.
You can run the benchmark in your repository on your branch.
For the detail, please see Running benchmarks in your forked repository section of the Apache Spark developer-tools guideline.
|
@dongjoon-hyun updated the benchmark result, didn't see regression. |
|
Oh, you need to make it as a part of this commit instead of the PR description. Please commit the result for both Java 17 and Java 21 in the following files, @liuzqt . |
Updated the benchmark result in the PR |
|
Thank you for attaching the result. The result looks good to me too, @liuzqt . |
|
cc @cloud-fan , @Ngone51 , @JoshRosen from #47192 , too |
|
Hi @dongjoon-hyun @mridulm @Ngone51 could you pls help merge this PR? thanks! |
JoshRosen
left a comment
There was a problem hiding this comment.
LGTM as well: I think that this is a performance-sensitive enough path that it makes sense to incrementally maintain a separate count vs. repeatedly re-aggregating it, even if this comes at the price of some additional code. This code is comparatively low churn so I'm not especially worried about the risks of it going out of sync in refactorings.
|
I've merged this to master (4.0.0). Thanks @liuzqt! |
|
Thank you, @liuzqt and all! |
What changes were proposed in this pull request?
This PR is trying to revive #47192, which was reverted due to regression in
ExternalAppendOnlyUnsafeRowArrayBenchmark.Root cause
We eventually decided to aggregate peak memory usage from all consumers on each
acquireExecutionMemoryinvocation. (see this discussion), which is O(n) complexity wherenis the number of consumers.ExternalAppendOnlyUnsafeRowArrayBenchmarkis implemented in a way that all iterations are run in a single task context, therefore the number of consumers is exploding.Notice that
TaskMemoryManager.consumersis never cleaned up the whole lifecycle, andTaskMemoryManager.acquireExecutionMemoryis a very frequent operation, doing a linear complexity(in terms of number of consumers) operation here might not be a good choice. This benchmark might be a corner case, but it's still possible to have a large number of consumers in a large query plan.I fallback to the previous implementation: maintain current execution memory with an extra lock. cc @Ngone51
Benchmark result
ExternalAppendOnlyUnsafeRowArrayBenchmark-results
ExternalAppendOnlyUnsafeRowArrayBenchmark-jdk21-results
Why are the changes needed?
Does this PR introduce any user-facing change?
NO
How was this patch tested?
New unit tests.
Was this patch authored or co-authored using generative AI tooling?
NO