Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ public class TaskMemoryManager {
*/
private volatile long acquiredButNotUsed = 0L;

/**
* Peak off heap memory usage by this task.
*/
private volatile long peakOffHeapMemory = 0L;

/**
* Peak on heap memory usage by this task.
*/
private volatile long peakOnHeapMemory = 0L;

/**
* Construct a new TaskMemoryManager.
*/
Expand Down Expand Up @@ -202,6 +212,17 @@ public long acquireExecutionMemory(long required, MemoryConsumer requestingConsu
logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
requestingConsumer);
}

// Consumer will update its used memory after acquireExecutionMemory, so we need to add `got`
// to compute current peak
long currentPeak = consumers.stream().filter(c -> c.getMode() == mode)
.mapToLong(MemoryConsumer::getUsed).sum() + got;
if (mode == MemoryMode.OFF_HEAP) {
peakOffHeapMemory = Math.max(peakOffHeapMemory, currentPeak);
} else {
peakOnHeapMemory = Math.max(peakOnHeapMemory, currentPeak);
}

return got;
}
}
Expand Down Expand Up @@ -507,4 +528,19 @@ public long getMemoryConsumptionForThisTask() {
public MemoryMode getTungstenMemoryMode() {
return tungstenMemoryMode;
}

/**
* Returns peak task-level off-heap memory usage in bytes.
*
*/
public long getPeakOnHeapExecutionMemory() {
return peakOnHeapMemory;
}
Comment thread
liuzqt marked this conversation as resolved.

/**
* Returns peak task-level on-heap memory usage in bytes.
*/
public long getPeakOffHeapExecutionMemory() {
return peakOffHeapMemory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ private[spark] object InternalAccumulator {
val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled"
val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled"
val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory"
val PEAK_ON_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOnHeapExecutionMemory"
val PEAK_OFF_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOffHeapExecutionMemory"
val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses"
val TEST_ACCUM = METRICS_PREFIX + "testAccumulator"

Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,8 @@ private[spark] class Executor(
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.metrics.setResultSerializationTime(TimeUnit.NANOSECONDS.toMillis(
afterSerializationNs - beforeSerializationNs))
task.metrics.setPeakOnHeapExecutionMemory(taskMemoryManager.getPeakOnHeapExecutionMemory)
task.metrics.setPeakOffHeapExecutionMemory(taskMemoryManager.getPeakOffHeapExecutionMemory)
// Expose task metrics using the Dropwizard metrics system.
// Update task metrics counters
executorSource.METRIC_CPU_TIME.inc(task.metrics.executorCpuTime)
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class TaskMetrics private[spark] () extends Serializable {
private val _memoryBytesSpilled = new LongAccumulator
private val _diskBytesSpilled = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
private val _peakOnHeapExecutionMemory = new LongAccumulator
private val _peakOffHeapExecutionMemory = new LongAccumulator
private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]

/**
Expand Down Expand Up @@ -111,9 +113,22 @@ class TaskMetrics private[spark] () extends Serializable {
* joins. The value of this accumulator should be approximately the sum of the peak sizes
* across all such data structures created in this task. For SQL jobs, this only tracks all
* unsafe operators and ExternalSort.
* This is not equal to peakOnHeapExecutionMemory + peakOffHeapExecutionMemory
*/
// TODO: SPARK-48789: the naming is confusing since this does not really reflect the whole
Comment thread
liuzqt marked this conversation as resolved.
// execution memory. We'd better deprecate this once we have a replacement.
def peakExecutionMemory: Long = _peakExecutionMemory.sum
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we change its implementation to be peakOnHeapExecutionMemory + peakOffHeapExecutionMemory? The current implementation doesn't make much sense due to https://github.com/apache/spark/pull/47192/files#r1692144786

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think we can plan this breaking change at spark 4.0

Copy link
Copy Markdown
Contributor

@mridulm mridulm Aug 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peakOnHeapExecutionMemory, peakOffHeapExecutionMemory can peak at different times, so we can't replace it with the sum.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, makes sense. Let's leave it then.


/**
* Peak on heap execution memory as tracked by TaskMemoryManager.
*/
def peakOnHeapExecutionMemory: Long = _peakOnHeapExecutionMemory.sum

/**
* Peak off heap execution memory as tracked by TaskMemoryManager.
*/
def peakOffHeapExecutionMemory: Long = _peakOffHeapExecutionMemory.sum
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discuss:
Is it required that peakExecutionMemory <= peakOnHeapExecutionMemory + peakOffHeapExecutionMemory ?
Any cases where this might get violated ?

I am trying to reason about completeness of these metrics (given we want to eventually deprecate the existing one).
I expect the above to hold, but want to make sure I am not missing anything.
+CC @JoshRosen

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

peakExecutionMemory <= peakOnHeapExecutionMemory + peakOffHeapExecutionMemory?

I think yes, becauseTaskMemoryManager.acquireExecutionMemory is the only narrow waist for any execution memory acquisition and we maintain the memory here.

Instead, the legacy peakExecutionMemory is maintained in some operators (join, agg, sort), which is totally up to operator implementation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I agree that the peakExecutionMemory <= peakOnHeapExecutionMemory + peakOffHeapExecutionMemory should hold:

If we trace through the existing callers of incPeakExecutionMemory it looks like all of the usages flow from counts that correspond to the acquireExecutionMemory waist.


/**
* Storage statuses of any blocks that have been updated as a result of this task.
*
Expand Down Expand Up @@ -141,6 +156,10 @@ class TaskMetrics private[spark] () extends Serializable {
private[spark] def setResultSerializationTime(v: Long): Unit =
_resultSerializationTime.setValue(v)
private[spark] def setPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.setValue(v)
private[spark] def setPeakOnHeapExecutionMemory(v: Long): Unit =
_peakOnHeapExecutionMemory.setValue(v)
private[spark] def setPeakOffHeapExecutionMemory(v: Long): Unit =
_peakOffHeapExecutionMemory.setValue(v)
private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v)
private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v)
private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)
Expand Down Expand Up @@ -227,6 +246,8 @@ class TaskMetrics private[spark] () extends Serializable {
MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
DISK_BYTES_SPILLED -> _diskBytesSpilled,
PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
PEAK_ON_HEAP_EXECUTION_MEMORY -> _peakOnHeapExecutionMemory,
PEAK_OFF_HEAP_EXECUTION_MEMORY -> _peakOffHeapExecutionMemory,
UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ private[spark] object JsonProtocol extends JsonUtils {
g.writeNumberField("Executor Run Time", taskMetrics.executorRunTime)
g.writeNumberField("Executor CPU Time", taskMetrics.executorCpuTime)
g.writeNumberField("Peak Execution Memory", taskMetrics.peakExecutionMemory)
g.writeNumberField("Peak On Heap Execution Memory", taskMetrics.peakOnHeapExecutionMemory)
g.writeNumberField("Peak Off Heap Execution Memory", taskMetrics.peakOffHeapExecutionMemory)
g.writeNumberField("Result Size", taskMetrics.resultSize)
g.writeNumberField("JVM GC Time", taskMetrics.jvmGCTime)
g.writeNumberField("Result Serialization Time", taskMetrics.resultSerializationTime)
Expand Down Expand Up @@ -1254,6 +1256,10 @@ private[spark] object JsonProtocol extends JsonUtils {
// The "Peak Execution Memory" field was added in Spark 3.0.0:
metrics.setPeakExecutionMemory(
jsonOption(json.get("Peak Execution Memory")).map(_.extractLong).getOrElse(0))
metrics.setPeakOnHeapExecutionMemory(
jsonOption(json.get("Peak On Heap Execution Memory")).map(_.extractLong).getOrElse(0))
metrics.setPeakOffHeapExecutionMemory(
jsonOption(json.get("Peak Off Heap Execution Memory")).map(_.extractLong).getOrElse(0))
metrics.setResultSize(json.get("Result Size").extractLong)
metrics.setJvmGCTime(json.get("JVM GC Time").extractLong)
metrics.setResultSerializationTime(json.get("Result Serialization Time").extractLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,24 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
tMemManager.releaseExecutionMemory(500L, c)
assert(tMemManager.getMemoryConsumptionForThisTask === 0L)
}

test("task peak execution memory usage") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 1000L,
maxOffHeapExecutionMemory = 1000L)

val tMemManager = new TaskMemoryManager(memoryManager, 1)
val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
val onHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.ON_HEAP)

val result1 = tMemManager.acquireExecutionMemory(500L, offHeapConsumer)
val result2 = tMemManager.acquireExecutionMemory(400L, onHeapConsumer)
assert(result1 === 500L)
assert(result2 === 400L)
assert(tMemManager.getMemoryConsumptionForThisTask === 900L)
assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
assert(tMemManager.getPeakOffHeapExecutionMemory === 500L)
}
}

private object MemoryManagerSuite {
Expand Down
Loading