Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import scala.collection.mutable.ArrayBuffer

/**
* This rule is used to dynamic adjust stage resource profile for following purposes:
* 1. swap offheap and onheap memory size when whole stage fallback happened 2. increase executor
* heap memory if stage contains gluten operator and spark operator at the same time. Note: we
* don't support set resource profile for final stage now. Todo: support set resource profile
* for final stage.
* 1. Decrease offheap and increase onheap memory size when whole stage fallback happened; 2.
* Increase executor heap memory if stage contains gluten operator and spark operator at the
* same time. Note: we don't support set resource profile for final stage now. Todo: will
* support it.
*/
@Experimental
case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark: SparkSession)
Expand Down Expand Up @@ -88,6 +88,11 @@ case class GlutenAutoAdjustStageResourceProfile(glutenConf: GlutenConfig, spark:
val newExecutorMemory =
new ExecutorResourceRequest(ResourceProfile.MEMORY, newMemoryAmount.toLong)
executorResource.put(ResourceProfile.MEMORY, newExecutorMemory)

val newExecutorOffheap =
new ExecutorResourceRequest(ResourceProfile.OFFHEAP_MEM, offheapRequest.get.amount / 10)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zjuwangg, could you take a look? This is just an empirical setting. And I am not sure which setting is better.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM in whole stage fallback case!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would work only for Spark 3.5.4 or higher as apache/spark#48963

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@philo-he Should we consider making this setting configurable?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JkSelf, let's first use this empirical fixed setting and wait for feedback from user side. In theory, user don't need to configure too much offheap memory in stage fallback case. Thanks!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@philo-he
Thanks for improving on this!
just to confirm, I can see the code logic is to reduce the offheap memory size to 1/10, but on-heap memory size is not increased, is this intended?

Thanks, -yuan

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhouyuan, the on-heap memory is increased by specifying a larger value for ResourceProfile.MEMORY prior to the off-heap adjustment. If I have missed something, please let me know. Thanks!

executorResource.put(ResourceProfile.OFFHEAP_MEM, newExecutorOffheap)

val newRP = new ResourceProfile(executorResource.toMap, taskResource.toMap)
return applyNewResourceProfileIfPossible(plan, newRP, rpManager)
}
Expand Down
Loading