Skip to content

[GLUTEN-5438] feat: Dynamically sizing off-heap memory#5439

Merged
ulysses-you merged 8 commits intoapache:mainfrom
supermem613:dynamic-offheap-sizing
May 17, 2024
Merged

[GLUTEN-5438] feat: Dynamically sizing off-heap memory#5439
ulysses-you merged 8 commits intoapache:mainfrom
supermem613:dynamic-offheap-sizing

Conversation

@supermem613
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Today, in Spark we specify the on-heap and off-heap memory sizes as a configuration value read at the beginning of executing a job.

With this change, we are exposing a new feature that is enabled with a new spark.gluten.memory.dynamic.offHeap.sizing.enabled setting. When this setting is configured to true, the offheap setting will be ignored in Gluten and we will size the offheap as the same size as the spark.executor.memory setting.

We will then proceed to enforcing a total memory quota, calculated by the sum of what memory is committed and in use in the Java heap (calculated with Runtime.getRuntime().totalMemory() - Runtime.GetRuntime().freeMemory()) plus the tracked off-heap memory in TreeMemoryConsumer. When there is an allocation that would tide us over this total amount of committed memory, we will fail the allocation and trigger an OOM.

Note that with this change, we perform the "quota check" when an allocation in the native engine is informed to Gluten. In practice, this means that it is possible that the Java codebase can oversubscribe memory as it allocates, which is under the on-heap quota, although there is enough off-heap usage where we should fail the allocation.

A test exercising this setting is part of this change.

Fixes: #5438

How was this patch tested?

Manual testing with Spark and included test

… configuration value read at the beginning of executing a job.

With this change, we are exposing a new feature that is enabled with a new spark.gluten.memory.dynamic.offHeap.sizing.enabled setting. When this setting is configured to true, the offheap setting will be ignored in Gluten and we will size the offheap as the same size as the spark.executor.memory setting.

We will then proceed to enforcing a total memory quota, calculated by the sum of what memory is committed and in use in the Java heap (calculated with Runtime.getRuntime().totalMemory() - Runtime.GetRuntime().freeMemory()) plus the tracked off-heap memory in TreeMemoryConsumer. When there is an allocation that would tide us over this total amount of committed memory, we will fail the allocation and trigger an OOM.

Note that with this change, we perform the "quota check" when an allocation in the native engine is informed to Gluten. In practice, this means that it is possible that the Java codebase can oversubscribe memory as it allocates, which is under the on-heap quota, although there is enough off-heap usage where we should fail the allocation.

A test exercising this setting is part of this change.
@github-actions
Copy link
Copy Markdown

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI

@supermem613 supermem613 changed the title Dynamically sizing off-heap memory [GLUTEN-5438] feat: Dynamically sizing off-heap memory Apr 17, 2024
@github-actions
Copy link
Copy Markdown

#5438

@FelixYBW
Copy link
Copy Markdown
Contributor

Thank you for your PR! @zhanglistar may do a test quickly

Copy link
Copy Markdown
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for raising up this.

Though I could think about some limitations within the implementation:

  1. When heap is increasing by real JVM heap allocation, JVM will not respect Gluten's fake heap allocation then the overall RSS of the process may go beyond limit unexpectedly;
  2. Spill is still only be triggered by maximum off-heap limit only;

I only read the code so may miss something and please add if you know some.

And I know it's challenging to have a perfect solution for such topics so if we decide to provide this feature, I'd suggest we document the known limitations in code or doc files (can also add some experimental marks).

Also you can refer to some previous discussions if you missed them #3055 (comment).


import java.util.concurrent.atomic.AtomicLong;

public final class DynamicOffHeapSizingPolicyChecker {
Copy link
Copy Markdown
Member

@zhztheplayer zhztheplayer Apr 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make this policy checker a individual MemoryTarget? Then we can add it into the target pipeline in NativeMemoryManager#createNativeMemoryManager and reduce the code complexity of TreeMemoryConsumer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will investigate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please elaborate your suggestion a bit? The approach here leans on the policy checker being static, accounting for used off-heap bytes across all memory targets (say, if memory isolation is on). Are you suggesting a new TreeMemoryTarget or something that is an alternative to it, which presumably would have similar complexity since we have to support, say, memory isolation?

Copy link
Copy Markdown
Member

@zhztheplayer zhztheplayer Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting a new TreeMemoryTarget or something that is an alternative to it, which presumably would have similar complexity since we have to support, say, memory isolation?

We don't have to add something as heavy as TreeMemoryTarget. If I understand correctly, DynamicOffHeapSizingPolicyChecker was designed to be a global singleton, right? If that's true, we can make the policy checker extend API https://github.com/apache/incubator-gluten/blob/main/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTarget.java#L23 and having the allocations accounted into a global static counter in policy checker.

Just like the logging memory target https://github.com/apache/incubator-gluten/blob/main/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/LoggingMemoryTarget.java, it logs allocation through the static LOGGER object, and delegates all the allocations to the subsequent target.


DynamicOffHeapSizingPolicyChecker() {}

public boolean canBorrow(long size) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code says when native requires memory, we borrow onheap memory to it. But then later onheap requires memory, how can onheap get the actually memory before native repay? I think It would be killed by yarn or k8s due to it uses double memory at this time. And It's a very general case, e.g., there are stages running in parallel or there are many queries running in parallel.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you are referring to the caveat raised above that "it is possible that the Java codebase can oversubscribe memory as it allocates, which is under the on-heap quota, although there is enough off-heap usage where we should fail the allocation". Feedback was also given above that this should be documented in the code as well to be very explicit about the risk. I will do that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oversubscribe memory is quite risk. I think we should at least respect the Spark idea. The max execution memory in Spark is:

(spark.executor.memory - 300MB) * spark.memory.fraction

totalMemory() - freeMemory() breaks this idea. Even if the execution offload to native, there are still many things depend on on-heap memory, e.g., the metadata(plan, split, map status, event...), broadcast block, rdd cache. One thought is that, we'd better to follow the Spark, and change it to:

(totalMemory() - freeMemory() - 300MB) * spark.memory.fraction

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. With the proposed change, we are seeing the "max on heap" as being the spark.executor.memory setting, but that does go against Spark's setting aside memory accordingly. I'll change the proposed logic to follow what Spark has determined as the max on-heap.

@github-actions
Copy link
Copy Markdown

github-actions bot commented May 3, 2024

Run Gluten Clickhouse CI

supermem613 and others added 2 commits May 6, 2024 15:08
* Make it clear the proposed feature is experimental;
* Document the caveat of possible memory over-subscription;
* Use a new MemoryTarget class instead of changing TreeMemoryConsumer;
* Take into considering Spark's approach of sizing on-heap with a reserved 300MB and memory fraction;
@github-actions
Copy link
Copy Markdown

github-actions bot commented May 6, 2024

Run Gluten Clickhouse CI

@github-actions
Copy link
Copy Markdown

github-actions bot commented May 6, 2024

Run Gluten Clickhouse CI

Comment thread gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java Outdated
* Added function as asked;
* After further experimentation, decided to introduce an (optional) aggressive config that allows for changing the memory fraction used in the total memory calculation, but without changing Spark's memory fraction. This allows for fine-tuning / more aggressive usage if so desired;
* Fixed typos and missing spaces in some of the exception and feature doc text;
* Removed unnecessary test settings that do not impact the feature to speed things up a bit;
@github-actions
Copy link
Copy Markdown

github-actions bot commented May 8, 2024

Run Gluten Clickhouse CI

Comment thread shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala Outdated
long usedOnHeapBytes = (totalMemory - freeMemory);
long usedOffHeapBytesNow = USED_OFFHEAP_BYTES.get();

if (size + usedOffHeapBytesNow + usedOnHeapBytes > MAX_MEMORY_IN_BYTES) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we return MAX_MEMORY_IN_BYTES - (size + usedOffHeapBytesNow + usedOnHeapBytes) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so - we were asked to allocate a specific amount of memory. The check here is for what's overall allowed and returning it would deviate from current behavior.


@Override
public long repay(long size) {
long unreserved = delegated.repay(size);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, after calling repay, it is possible that the unreserved memory is from vanilla Spark rather than native. Is it expected for USED_OFFHEAP_BYTES ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer , am my understanding right ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this is possible. All repay/borrow flow here is not coming from Spark Vanilla.

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI

1 similar comment
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI

Copy link
Copy Markdown
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for now % the unresolved comments. cc @ulysses-you If you have more.

I actually still be a little bit concerned about the calculation totalMemory - freeMemory. But you can proceed on more integration tests to see if it leads to problems.

So overall I think it's expected to see more test results for this feature as the next step. Did you have some plan on that @supermem613? And anyway we can give the feature some time for more development iterations.

@ulysses-you
Copy link
Copy Markdown
Contributor

I'm fine with this pr, looking forward some best practices, thank you @supermem613 @zhztheplayer

Copy link
Copy Markdown
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@ulysses-you ulysses-you merged commit db8496b into apache:main May 17, 2024
@GlutenPerfBot
Copy link
Copy Markdown
Contributor

===== Performance report for TPCH SF2000 with Velox backend, for reference only ====

query log/native_5439_time.csv log/native_master_05_13_2024_33f993554_time.csv difference percentage
q1 35.79 36.42 0.634 101.77%
q2 23.46 23.70 0.232 100.99%
q3 38.97 36.61 -2.359 93.95%
q4 38.60 37.83 -0.769 98.01%
q5 68.38 70.62 2.231 103.26%
q6 7.09 7.47 0.382 105.39%
q7 81.94 85.08 3.134 103.82%
q8 83.31 84.30 0.994 101.19%
q9 121.28 121.00 -0.281 99.77%
q10 47.24 45.37 -1.878 96.03%
q11 20.38 19.49 -0.895 95.61%
q12 26.39 26.52 0.130 100.49%
q13 53.79 55.08 1.290 102.40%
q14 18.64 21.53 2.894 115.53%
q15 28.90 29.02 0.115 100.40%
q16 13.97 14.00 0.026 100.19%
q17 102.15 103.62 1.469 101.44%
q18 145.97 146.24 0.271 100.19%
q19 15.28 13.35 -1.932 87.36%
q20 26.28 29.06 2.778 110.57%
q21 284.19 282.54 -1.641 99.42%
q22 14.65 14.61 -0.034 99.77%
total 1296.65 1303.44 6.791 100.52%

Comment on lines +210 to +213
// If we are using dynamic off-heap sizing, we should also enable off-heap memory
// officially.
if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused here, even user set spark.memory.offHeap.enabled=false, it will still be enabled when dynamic off-heap sizing enabled?

I'm not sure whether Runtime#freeMemory could account the memory allocated from Unsafe class, which means if we re-enable spark off-heap memory, this portion of memory will not be accounted for within JVM. In another word, Runtime#freeMemory does not account Spark off-heap memory usage.

I try ask GPT and the answer is

The memory allocated through Unsafe#allocateMemory is not accounted for by the Runtime#freeMemory method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to do an explanation if nobody ack here. Author can correct me if anything worng.

In short, the spark.memory.offHeap.size is not used if spark.gluten.memory.dynamic.offHeap.sizing.enabled is enabled.

The magic of this pr did is that, it said we can leverage the power of JVM to control both onheap and offheap memory. We only need to configure spark.executor.memory which is the unified memory for onheap and offheap. I think the core idea is that, if there is enough free onhep memory, then we can borrow(logical borrow) it to offheap side and prevent the onheap memory to be allocated.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you thank you for the explanation.

if there is enough free onhep memory, then we can borrow(logical borrow) it to offheap side and prevent the onheap memory to be allocated.

I got the core idea, but I dont understand why it can prevent the onheap memory allocation totally, would you give more explanation? thank you!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see, it can not. Actually, I have asked the question before. For now, we add a fraction to preserve some memory for onheap, references.

@Heatao
Copy link
Copy Markdown

Heatao commented Jul 11, 2024

I'm also a bit confused here, if it cannot prevent onheap memory allocation, will this prevent GC from being triggered?
JVM thought it could allocate memory, so it did not trigger GC, resulting in being killed by k8s or yarn

@zhztheplayer
Copy link
Copy Markdown
Member

I'm also a bit confused here, if it cannot prevent onheap memory allocation, will this prevent GC from being triggered? JVM thought it could allocate memory, so it did not trigger GC, resulting in being killed by k8s or yarn

That's the case that this approach doesn't handle yet. The faked on-heap allocations from this feature don't count into JVM's heap size counter so never trigger GC. I assume this feature isn't production-ready before being improved.

// If we are using dynamic off-heap sizing, we should also enable off-heap memory
// officially.
if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If GLUTEN_OFFHEAP_ENABLED is set, Spark will request this portion of memory, so the total memory requested is approximately spark.executor.memory + spark.memory.offHeap.size. However, in this feature, spark.executor.memory is managed uniformly for onheap and offheap memory. So the final effect is that Spark requests spark.executor.memory + spark.memory.offHeap.size memory from the resource manager, but this feature only uses a portion of that memory. In other words, it seems to be limiting itself? @supermem613

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Dynamically sizing off-heap memory

9 participants