From 2c59644fafd7e2f3f21d9957f2e52a30a39f5073 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Fri, 14 Jun 2024 10:41:30 -0700 Subject: [PATCH 1/9] add task peak on/off heap memory metrics --- .../spark/memory/TaskMemoryManager.java | 33 +++++++++++++++++++ .../apache/spark/InternalAccumulator.scala | 2 ++ .../org/apache/spark/executor/Executor.scala | 2 ++ .../apache/spark/executor/TaskMetrics.scala | 14 ++++++++ .../apache/spark/memory/MemoryManager.scala | 16 +++++++++ .../spark/memory/MemoryManagerSuite.scala | 22 +++++++++++++ .../apache/spark/util/JsonProtocolSuite.scala | 2 ++ 7 files changed, 91 insertions(+) diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index fe798e40a6ad7..39479d53fa665 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -122,6 +122,16 @@ public class TaskMemoryManager { */ private volatile long acquiredButNotUsed = 0L; + /** + * Peak off heap memory usage by this task. + */ + private long peakTaskOffHeapMemory = 0L; + + /** + * Peak on heap memory usage by this task. + */ + private long peakTaskOnHeapMemory = 0L; + /** * Construct a new TaskMemoryManager. */ @@ -202,6 +212,14 @@ public long acquireExecutionMemory(long required, MemoryConsumer requestingConsu logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), requestingConsumer); } + + if (mode == MemoryMode.OFF_HEAP) { + peakTaskOffHeapMemory = Math.max(peakTaskOffHeapMemory, + memoryManager.getOffHeapExecutionMemoryUsageForTask(taskAttemptId)); + } else { + peakTaskOnHeapMemory = Math.max(peakTaskOnHeapMemory, + memoryManager.getOnHeapExecutionMemoryUsageForTask(taskAttemptId)); + } return got; } } @@ -507,4 +525,19 @@ public long getMemoryConsumptionForThisTask() { public MemoryMode getTungstenMemoryMode() { return tungstenMemoryMode; } + + /** + * Returns peak task-level off-heap memory usage in bytes. + * + */ + public long getPeakOnHeapExecutionMemory() { + return peakTaskOnHeapMemory; + } + + /** + * Returns peak task-level on-heap memory usage in bytes. + */ + public long getPeakOffHeapExecutionMemory() { + return peakTaskOffHeapMemory; + } } diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index ef4609e6d6456..505634d5bb048 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -41,6 +41,8 @@ private[spark] object InternalAccumulator { val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled" val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled" val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory" + val PEAK_ON_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOnHeapExecutionMemory" + val PEAK_OFF_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOffHeapExecutionMemory" val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses" val TEST_ACCUM = METRICS_PREFIX + "testAccumulator" diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 586a8a7db28a3..4fc2d82ac4afe 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -706,6 +706,8 @@ private[spark] class Executor( task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.metrics.setResultSerializationTime(TimeUnit.NANOSECONDS.toMillis( afterSerializationNs - beforeSerializationNs)) + task.metrics.setPeakOnHeapExecutionMemory(taskMemoryManager.getPeakOnHeapExecutionMemory) + task.metrics.setPeakOffHeapExecutionMemory(taskMemoryManager.getPeakOffHeapExecutionMemory) // Expose task metrics using the Dropwizard metrics system. // Update task metrics counters executorSource.METRIC_CPU_TIME.inc(task.metrics.executorCpuTime) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 263de0121f7c7..a181ff9418db5 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -57,6 +57,8 @@ class TaskMetrics private[spark] () extends Serializable { private val _memoryBytesSpilled = new LongAccumulator private val _diskBytesSpilled = new LongAccumulator private val _peakExecutionMemory = new LongAccumulator + private val _peakOnHeapExecutionMemory = new LongAccumulator + private val _peakOffHeapExecutionMemory = new LongAccumulator private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)] /** @@ -111,8 +113,14 @@ class TaskMetrics private[spark] () extends Serializable { * across all such data structures created in this task. For SQL jobs, this only tracks all * unsafe operators and ExternalSort. */ + // TODO: SPARK-48789: the naming is confusing since this does not really reflect the whole + // execution memory. We'd better deprecate this once we have a replacement. def peakExecutionMemory: Long = _peakExecutionMemory.sum + def peakOnHeapExecutionMemory: Long = _peakOnHeapExecutionMemory.sum + + def peakOffHeapExecutionMemory: Long = _peakOffHeapExecutionMemory.sum + /** * Storage statuses of any blocks that have been updated as a result of this task. * @@ -140,6 +148,10 @@ class TaskMetrics private[spark] () extends Serializable { private[spark] def setResultSerializationTime(v: Long): Unit = _resultSerializationTime.setValue(v) private[spark] def setPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.setValue(v) + private[spark] def setPeakOnHeapExecutionMemory(v: Long): Unit = + _peakOnHeapExecutionMemory.setValue(v) + private[spark] def setPeakOffHeapExecutionMemory(v: Long): Unit = + _peakOffHeapExecutionMemory.setValue(v) private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v) private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v) private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v) @@ -221,6 +233,8 @@ class TaskMetrics private[spark] () extends Serializable { MEMORY_BYTES_SPILLED -> _memoryBytesSpilled, DISK_BYTES_SPILLED -> _diskBytesSpilled, PEAK_EXECUTION_MEMORY -> _peakExecutionMemory, + PEAK_ON_HEAP_EXECUTION_MEMORY -> _peakOnHeapExecutionMemory, + PEAK_OFF_HEAP_EXECUTION_MEMORY -> _peakOffHeapExecutionMemory, UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses, shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched, shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched, diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index c33fca35764e5..058cc5a5e9ead 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -219,6 +219,22 @@ private[spark] abstract class MemoryManager( offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) } + /** + * Return the on-heap execution memory consumption, in bytes, for the given task. + */ + private[memory] def getOnHeapExecutionMemoryUsageForTask(taskAttemptId: Long) + : Long = synchronized { + onHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) + } + + /** + * Return the off-heap execution memory consumption, in bytes, for the given task. + */ + private[memory] def getOffHeapExecutionMemoryUsageForTask(taskAttemptId: Long) + : Long = synchronized { + offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) + } + // -- Fields related to Tungsten managed memory ------------------------------------------------- /** diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index a6f1707a1aabf..9d568c4a60c0d 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -335,6 +335,28 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite { tMemManager.releaseExecutionMemory(500L, c) assert(tMemManager.getMemoryConsumptionForThisTask === 0L) } + + test("task peak execution memory usage") { + val memoryManager = createMemoryManager( + maxOnHeapExecutionMemory = 1000L, + maxOffHeapExecutionMemory = 1000L) + + val tMemManager = new TaskMemoryManager(memoryManager, 1) + val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP) + val onHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.ON_HEAP) + + val result1 = Future { + tMemManager.acquireExecutionMemory(500L, offHeapConsumer) + } + val result2 = Future { + tMemManager.acquireExecutionMemory(400L, onHeapConsumer) + } + assert(ThreadUtils.awaitResult(result1, 200.millis) === 500L) + assert(ThreadUtils.awaitResult(result2, 200.millis) === 400L) + assert(tMemManager.getMemoryConsumptionForThisTask === 900L) + assert(tMemManager.getPeakOnHeapExecutionMemory === 400L) + assert(tMemManager.getPeakOffHeapExecutionMemory === 500L) + } } private object MemoryManagerSuite { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 3eae1b3278e74..764451a834dce 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -1453,6 +1453,8 @@ private[spark] object JsonProtocolSuite extends Assertions { t.setExecutorRunTime(b) t.setExecutorCpuTime(b) t.setPeakExecutionMemory(c) + t.setPeakOnHeapExecutionMemory(c) + t.setPeakOffHeapExecutionMemory(c) t.setResultSize(c) t.setJvmGCTime(d) t.setResultSerializationTime(a + b) From 672d0582b210133dba9c46ed3214ed5a36f6401d Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Tue, 2 Jul 2024 22:03:50 -0700 Subject: [PATCH 2/9] fix test --- .../apache/spark/util/JsonProtocolSuite.scala | 64 +++++++++++-------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 764451a834dce..5c14aa6cd3ce9 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -2685,6 +2685,20 @@ private[spark] object JsonProtocolSuite extends Assertions { | }, | { | "ID": 10, + | "Name": "$PEAK_ON_HEAP_EXECUTION_MEMORY", + | "Update": 500, + | "Internal": true, + | "Count Failed Values": true + | }, + | { + | "ID": 11, + | "Name": "$PEAK_OFF_HEAP_EXECUTION_MEMORY", + | "Update": 500, + | "Internal": true, + | "Count Failed Values": true + | }, + | { + | "ID": 12, | "Name": "$UPDATED_BLOCK_STATUSES", | "Update": [ | { @@ -2706,175 +2720,175 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Count Failed Values": true | }, | { - | "ID": 11, + | "ID": 13, | "Name": "${shuffleRead.REMOTE_BLOCKS_FETCHED}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 12, + | "ID": 14, | "Name": "${shuffleRead.LOCAL_BLOCKS_FETCHED}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 13, + | "ID": 15, | "Name": "${shuffleRead.REMOTE_BYTES_READ}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 14, + | "ID": 16, | "Name": "${shuffleRead.REMOTE_BYTES_READ_TO_DISK}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 15, + | "ID": 17, | "Name": "${shuffleRead.LOCAL_BYTES_READ}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 16, + | "ID": 18, | "Name": "${shuffleRead.FETCH_WAIT_TIME}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 17, + | "ID": 19, | "Name": "${shuffleRead.RECORDS_READ}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 18, + | "ID": 20, | "Name": "${shuffleRead.CORRUPT_MERGED_BLOCK_CHUNKS}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 19, + | "ID": 21, | "Name": "${shuffleRead.MERGED_FETCH_FALLBACK_COUNT}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID" : 20, + | "ID" : 22, | "Name" : "${shuffleRead.REMOTE_MERGED_BLOCKS_FETCHED}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 21, + | "ID" : 23, | "Name" : "${shuffleRead.LOCAL_MERGED_BLOCKS_FETCHED}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 22, + | "ID" : 24, | "Name" : "${shuffleRead.REMOTE_MERGED_CHUNKS_FETCHED}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 23, + | "ID" : 25, | "Name" : "${shuffleRead.LOCAL_MERGED_CHUNKS_FETCHED}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 24, + | "ID" : 26, | "Name" : "${shuffleRead.REMOTE_MERGED_BYTES_READ}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 25, + | "ID" : 27, | "Name" : "${shuffleRead.LOCAL_MERGED_BYTES_READ}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 26, + | "ID" : 28, | "Name" : "${shuffleRead.REMOTE_REQS_DURATION}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 27, + | "ID" : 29, | "Name" : "${shuffleRead.REMOTE_MERGED_REQS_DURATION}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID": 28, + | "ID": 30, | "Name": "${shuffleWrite.BYTES_WRITTEN}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 29, + | "ID": 31, | "Name": "${shuffleWrite.RECORDS_WRITTEN}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 30, + | "ID": 32, | "Name": "${shuffleWrite.WRITE_TIME}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 31, + | "ID": 33, | "Name": "${input.BYTES_READ}", | "Update": 2100, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 32, + | "ID": 34, | "Name": "${input.RECORDS_READ}", | "Update": 21, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 33, + | "ID": 35, | "Name": "${output.BYTES_WRITTEN}", | "Update": 1200, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 34, + | "ID": 36, | "Name": "${output.RECORDS_WRITTEN}", | "Update": 12, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 35, + | "ID": 37, | "Name": "$TEST_ACCUM", | "Update": 0, | "Internal": true, From 89a703fe51a8686ecdc51d1d5e3323d2aba65bc6 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Wed, 3 Jul 2024 11:58:39 -0700 Subject: [PATCH 3/9] rename --- .../org/apache/spark/memory/TaskMemoryManager.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 39479d53fa665..d8eca8f037c1d 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -125,12 +125,12 @@ public class TaskMemoryManager { /** * Peak off heap memory usage by this task. */ - private long peakTaskOffHeapMemory = 0L; + private long peakOffHeapMemory = 0L; /** * Peak on heap memory usage by this task. */ - private long peakTaskOnHeapMemory = 0L; + private long peakOnHeapMemory = 0L; /** * Construct a new TaskMemoryManager. @@ -214,10 +214,10 @@ public long acquireExecutionMemory(long required, MemoryConsumer requestingConsu } if (mode == MemoryMode.OFF_HEAP) { - peakTaskOffHeapMemory = Math.max(peakTaskOffHeapMemory, + peakOffHeapMemory = Math.max(peakOffHeapMemory, memoryManager.getOffHeapExecutionMemoryUsageForTask(taskAttemptId)); } else { - peakTaskOnHeapMemory = Math.max(peakTaskOnHeapMemory, + peakOnHeapMemory = Math.max(peakOnHeapMemory, memoryManager.getOnHeapExecutionMemoryUsageForTask(taskAttemptId)); } return got; @@ -531,13 +531,13 @@ public MemoryMode getTungstenMemoryMode() { * */ public long getPeakOnHeapExecutionMemory() { - return peakTaskOnHeapMemory; + return peakOnHeapMemory; } /** * Returns peak task-level on-heap memory usage in bytes. */ public long getPeakOffHeapExecutionMemory() { - return peakTaskOffHeapMemory; + return peakOffHeapMemory; } } From 70bbdc5c8fb799ebc22c99c958e23672df38238c Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Fri, 5 Jul 2024 13:19:25 -0700 Subject: [PATCH 4/9] address comments --- .../org/apache/spark/executor/TaskMetrics.scala | 7 +++++++ .../org/apache/spark/memory/MemoryManagerSuite.scala | 12 ++++-------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index a181ff9418db5..8d100502a5efe 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -112,13 +112,20 @@ class TaskMetrics private[spark] () extends Serializable { * joins. The value of this accumulator should be approximately the sum of the peak sizes * across all such data structures created in this task. For SQL jobs, this only tracks all * unsafe operators and ExternalSort. + * This is not equal to peakOnHeapExecutionMemory + peakOffHeapExecutionMemory */ // TODO: SPARK-48789: the naming is confusing since this does not really reflect the whole // execution memory. We'd better deprecate this once we have a replacement. def peakExecutionMemory: Long = _peakExecutionMemory.sum + /** + * Peak on heap execution memory as tracked by TaskMemoryManager. + */ def peakOnHeapExecutionMemory: Long = _peakOnHeapExecutionMemory.sum + /** + * Peak off heap execution memory as tracked by TaskMemoryManager. + */ def peakOffHeapExecutionMemory: Long = _peakOffHeapExecutionMemory.sum /** diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 9d568c4a60c0d..1f40ef944a843 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -345,14 +345,10 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite { val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP) val onHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.ON_HEAP) - val result1 = Future { - tMemManager.acquireExecutionMemory(500L, offHeapConsumer) - } - val result2 = Future { - tMemManager.acquireExecutionMemory(400L, onHeapConsumer) - } - assert(ThreadUtils.awaitResult(result1, 200.millis) === 500L) - assert(ThreadUtils.awaitResult(result2, 200.millis) === 400L) + val result1 = tMemManager.acquireExecutionMemory(500L, offHeapConsumer) + val result2 = tMemManager.acquireExecutionMemory(400L, onHeapConsumer) + assert(result1 === 500L) + assert(result2 === 400L) assert(tMemManager.getMemoryConsumptionForThisTask === 900L) assert(tMemManager.getPeakOnHeapExecutionMemory === 400L) assert(tMemManager.getPeakOffHeapExecutionMemory === 500L) From 12a849baea794c815c8aa0a505d0dbf72a65dc6c Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Fri, 5 Jul 2024 13:22:51 -0700 Subject: [PATCH 5/9] make peak*memory volatile --- .../main/java/org/apache/spark/memory/TaskMemoryManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index d8eca8f037c1d..137e479f1fe45 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -125,12 +125,12 @@ public class TaskMemoryManager { /** * Peak off heap memory usage by this task. */ - private long peakOffHeapMemory = 0L; + private volatile long peakOffHeapMemory = 0L; /** * Peak on heap memory usage by this task. */ - private long peakOnHeapMemory = 0L; + private volatile long peakOnHeapMemory = 0L; /** * Construct a new TaskMemoryManager. From dad880f497d303bfa041b7f553efad19dac85960 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Mon, 15 Jul 2024 12:59:24 -0700 Subject: [PATCH 6/9] update JsonProtocol --- .../src/main/scala/org/apache/spark/util/JsonProtocol.scala | 6 ++++++ .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 22dcf6c11e4b4..d2e21dbe3cbc9 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -597,6 +597,8 @@ private[spark] object JsonProtocol extends JsonUtils { g.writeNumberField("Executor Run Time", taskMetrics.executorRunTime) g.writeNumberField("Executor CPU Time", taskMetrics.executorCpuTime) g.writeNumberField("Peak Execution Memory", taskMetrics.peakExecutionMemory) + g.writeNumberField("Peak On Heap Execution Memory", taskMetrics.peakOnHeapExecutionMemory) + g.writeNumberField("Peak Off Heap Execution Memory", taskMetrics.peakOffHeapExecutionMemory) g.writeNumberField("Result Size", taskMetrics.resultSize) g.writeNumberField("JVM GC Time", taskMetrics.jvmGCTime) g.writeNumberField("Result Serialization Time", taskMetrics.resultSerializationTime) @@ -1254,6 +1256,10 @@ private[spark] object JsonProtocol extends JsonUtils { // The "Peak Execution Memory" field was added in Spark 3.0.0: metrics.setPeakExecutionMemory( jsonOption(json.get("Peak Execution Memory")).map(_.extractLong).getOrElse(0)) + metrics.setPeakOnHeapExecutionMemory( + jsonOption(json.get("Peak On Heap Execution Memory")).map(_.extractLong).getOrElse(0)) + metrics.setPeakOffHeapExecutionMemory( + jsonOption(json.get("Peak Off Heap Execution Memory")).map(_.extractLong).getOrElse(0)) metrics.setResultSize(json.get("Result Size").extractLong) metrics.setJvmGCTime(json.get("JVM GC Time").extractLong) metrics.setResultSerializationTime(json.get("Result Serialization Time").extractLong) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 5c14aa6cd3ce9..cdee6ccda706e 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -1733,6 +1733,8 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Executor Run Time": 400, | "Executor CPU Time": 400, | "Peak Execution Memory": 500, + | "Peak On Heap Execution Memory": 500, + | "Peak Off Heap Execution Memory": 500, | "Result Size": 500, | "JVM GC Time": 600, | "Result Serialization Time": 700, @@ -1874,6 +1876,8 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Executor Run Time": 400, | "Executor CPU Time": 400, | "Peak Execution Memory": 500, + | "Peak On Heap Execution Memory": 500, + | "Peak Off Heap Execution Memory": 500, | "Result Size": 500, | "JVM GC Time": 600, | "Result Serialization Time": 700, @@ -2015,6 +2019,8 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Executor Run Time": 400, | "Executor CPU Time": 400, | "Peak Execution Memory": 500, + | "Peak On Heap Execution Memory": 500, + | "Peak Off Heap Execution Memory": 500, | "Result Size": 500, | "JVM GC Time": 600, | "Result Serialization Time": 700, From 8cec467fb563b3e8cef5d7281665f5db19a3786d Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Mon, 15 Jul 2024 20:25:57 -0700 Subject: [PATCH 7/9] maintain peak memory in task memory manager level --- .../spark/memory/TaskMemoryManager.java | 35 ++++++++++++++++--- .../apache/spark/memory/MemoryManager.scala | 16 --------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 137e479f1fe45..9e9dbba75944e 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -122,6 +122,20 @@ public class TaskMemoryManager { */ private volatile long acquiredButNotUsed = 0L; + /** + * Current off heap memory usage by this task. + */ + private long currentOffHeapMemory = 0L; + + private final Object offHeapMemoryLock = new Object(); + + /* + * Current on heap memory usage by this task. + */ + private long currentOnHeapMemory = 0L; + + private final Object onHeapMemoryLock = new Object(); + /** * Peak off heap memory usage by this task. */ @@ -214,11 +228,15 @@ public long acquireExecutionMemory(long required, MemoryConsumer requestingConsu } if (mode == MemoryMode.OFF_HEAP) { - peakOffHeapMemory = Math.max(peakOffHeapMemory, - memoryManager.getOffHeapExecutionMemoryUsageForTask(taskAttemptId)); + synchronized (offHeapMemoryLock) { + currentOffHeapMemory += got; + peakOffHeapMemory = Math.max(peakOffHeapMemory, currentOffHeapMemory); + } } else { - peakOnHeapMemory = Math.max(peakOnHeapMemory, - memoryManager.getOnHeapExecutionMemoryUsageForTask(taskAttemptId)); + synchronized (onHeapMemoryLock) { + currentOnHeapMemory += got; + peakOnHeapMemory = Math.max(peakOnHeapMemory, currentOnHeapMemory); + } } return got; } @@ -287,6 +305,15 @@ public void releaseExecutionMemory(long size, MemoryConsumer consumer) { consumer); } memoryManager.releaseExecutionMemory(size, taskAttemptId, consumer.getMode()); + if (consumer.getMode() == MemoryMode.OFF_HEAP) { + synchronized (offHeapMemoryLock) { + currentOffHeapMemory -= size; + } + } else { + synchronized (onHeapMemoryLock) { + currentOnHeapMemory -= size; + } + } } /** diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 058cc5a5e9ead..c33fca35764e5 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -219,22 +219,6 @@ private[spark] abstract class MemoryManager( offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) } - /** - * Return the on-heap execution memory consumption, in bytes, for the given task. - */ - private[memory] def getOnHeapExecutionMemoryUsageForTask(taskAttemptId: Long) - : Long = synchronized { - onHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) - } - - /** - * Return the off-heap execution memory consumption, in bytes, for the given task. - */ - private[memory] def getOffHeapExecutionMemoryUsageForTask(taskAttemptId: Long) - : Long = synchronized { - offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) - } - // -- Fields related to Tungsten managed memory ------------------------------------------------- /** From 163fe06e010e637d98ff8eb2921d2612f95be177 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Wed, 17 Jul 2024 17:43:04 -0700 Subject: [PATCH 8/9] update --- .../spark/memory/TaskMemoryManager.java | 38 ++++--------------- 1 file changed, 7 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 9e9dbba75944e..541ed7e2350b2 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -122,20 +122,6 @@ public class TaskMemoryManager { */ private volatile long acquiredButNotUsed = 0L; - /** - * Current off heap memory usage by this task. - */ - private long currentOffHeapMemory = 0L; - - private final Object offHeapMemoryLock = new Object(); - - /* - * Current on heap memory usage by this task. - */ - private long currentOnHeapMemory = 0L; - - private final Object onHeapMemoryLock = new Object(); - /** * Peak off heap memory usage by this task. */ @@ -227,17 +213,16 @@ public long acquireExecutionMemory(long required, MemoryConsumer requestingConsu requestingConsumer); } + // Consumer will update its used memory after acquireExecutionMemory, so we need to add `got` + // to compute current peak + long currentPeak = consumers.stream().filter(c -> c.getMode() == mode) + .mapToLong(MemoryConsumer::getUsed).sum() + got; if (mode == MemoryMode.OFF_HEAP) { - synchronized (offHeapMemoryLock) { - currentOffHeapMemory += got; - peakOffHeapMemory = Math.max(peakOffHeapMemory, currentOffHeapMemory); - } + peakOffHeapMemory = Math.max(peakOffHeapMemory, currentPeak); } else { - synchronized (onHeapMemoryLock) { - currentOnHeapMemory += got; - peakOnHeapMemory = Math.max(peakOnHeapMemory, currentOnHeapMemory); - } + peakOnHeapMemory = Math.max(peakOnHeapMemory, currentPeak); } + return got; } } @@ -305,15 +290,6 @@ public void releaseExecutionMemory(long size, MemoryConsumer consumer) { consumer); } memoryManager.releaseExecutionMemory(size, taskAttemptId, consumer.getMode()); - if (consumer.getMode() == MemoryMode.OFF_HEAP) { - synchronized (offHeapMemoryLock) { - currentOffHeapMemory -= size; - } - } else { - synchronized (onHeapMemoryLock) { - currentOnHeapMemory -= size; - } - } } /** From 110ccaeead0940989ff36c75ec3df5979ab49353 Mon Sep 17 00:00:00 2001 From: Ziqi Liu Date: Tue, 23 Jul 2024 15:18:56 -0700 Subject: [PATCH 9/9] empty commit