diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 541ed7e2350b2..fe798e40a6ad7 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -122,16 +122,6 @@ public class TaskMemoryManager { */ private volatile long acquiredButNotUsed = 0L; - /** - * Peak off heap memory usage by this task. - */ - private volatile long peakOffHeapMemory = 0L; - - /** - * Peak on heap memory usage by this task. - */ - private volatile long peakOnHeapMemory = 0L; - /** * Construct a new TaskMemoryManager. */ @@ -212,17 +202,6 @@ public long acquireExecutionMemory(long required, MemoryConsumer requestingConsu logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), requestingConsumer); } - - // Consumer will update its used memory after acquireExecutionMemory, so we need to add `got` - // to compute current peak - long currentPeak = consumers.stream().filter(c -> c.getMode() == mode) - .mapToLong(MemoryConsumer::getUsed).sum() + got; - if (mode == MemoryMode.OFF_HEAP) { - peakOffHeapMemory = Math.max(peakOffHeapMemory, currentPeak); - } else { - peakOnHeapMemory = Math.max(peakOnHeapMemory, currentPeak); - } - return got; } } @@ -528,19 +507,4 @@ public long getMemoryConsumptionForThisTask() { public MemoryMode getTungstenMemoryMode() { return tungstenMemoryMode; } - - /** - * Returns peak task-level off-heap memory usage in bytes. - * - */ - public long getPeakOnHeapExecutionMemory() { - return peakOnHeapMemory; - } - - /** - * Returns peak task-level on-heap memory usage in bytes. - */ - public long getPeakOffHeapExecutionMemory() { - return peakOffHeapMemory; - } } diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index 505634d5bb048..ef4609e6d6456 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -41,8 +41,6 @@ private[spark] object InternalAccumulator { val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled" val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled" val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory" - val PEAK_ON_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOnHeapExecutionMemory" - val PEAK_OFF_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOffHeapExecutionMemory" val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses" val TEST_ACCUM = METRICS_PREFIX + "testAccumulator" diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0f0e7cb3d8974..9f1382ac8b1c2 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -706,8 +706,6 @@ private[spark] class Executor( task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.metrics.setResultSerializationTime(TimeUnit.NANOSECONDS.toMillis( afterSerializationNs - beforeSerializationNs)) - task.metrics.setPeakOnHeapExecutionMemory(taskMemoryManager.getPeakOnHeapExecutionMemory) - task.metrics.setPeakOffHeapExecutionMemory(taskMemoryManager.getPeakOffHeapExecutionMemory) // Expose task metrics using the Dropwizard metrics system. // Update task metrics counters executorSource.METRIC_CPU_TIME.inc(task.metrics.executorCpuTime) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 582c93007f4f5..8540e8c330a09 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -56,8 +56,6 @@ class TaskMetrics private[spark] () extends Serializable { private val _memoryBytesSpilled = new LongAccumulator private val _diskBytesSpilled = new LongAccumulator private val _peakExecutionMemory = new LongAccumulator - private val _peakOnHeapExecutionMemory = new LongAccumulator - private val _peakOffHeapExecutionMemory = new LongAccumulator private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)] /** @@ -111,22 +109,9 @@ class TaskMetrics private[spark] () extends Serializable { * joins. The value of this accumulator should be approximately the sum of the peak sizes * across all such data structures created in this task. For SQL jobs, this only tracks all * unsafe operators and ExternalSort. - * This is not equal to peakOnHeapExecutionMemory + peakOffHeapExecutionMemory */ - // TODO: SPARK-48789: the naming is confusing since this does not really reflect the whole - // execution memory. We'd better deprecate this once we have a replacement. def peakExecutionMemory: Long = _peakExecutionMemory.sum - /** - * Peak on heap execution memory as tracked by TaskMemoryManager. - */ - def peakOnHeapExecutionMemory: Long = _peakOnHeapExecutionMemory.sum - - /** - * Peak off heap execution memory as tracked by TaskMemoryManager. - */ - def peakOffHeapExecutionMemory: Long = _peakOffHeapExecutionMemory.sum - /** * Storage statuses of any blocks that have been updated as a result of this task. * @@ -154,10 +139,6 @@ class TaskMetrics private[spark] () extends Serializable { private[spark] def setResultSerializationTime(v: Long): Unit = _resultSerializationTime.setValue(v) private[spark] def setPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.setValue(v) - private[spark] def setPeakOnHeapExecutionMemory(v: Long): Unit = - _peakOnHeapExecutionMemory.setValue(v) - private[spark] def setPeakOffHeapExecutionMemory(v: Long): Unit = - _peakOffHeapExecutionMemory.setValue(v) private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v) private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v) private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v) @@ -244,8 +225,6 @@ class TaskMetrics private[spark] () extends Serializable { MEMORY_BYTES_SPILLED -> _memoryBytesSpilled, DISK_BYTES_SPILLED -> _diskBytesSpilled, PEAK_EXECUTION_MEMORY -> _peakExecutionMemory, - PEAK_ON_HEAP_EXECUTION_MEMORY -> _peakOnHeapExecutionMemory, - PEAK_OFF_HEAP_EXECUTION_MEMORY -> _peakOffHeapExecutionMemory, UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses, shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched, shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched, diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 19cefbc0479a9..4c7de70adafd6 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -597,8 +597,6 @@ private[spark] object JsonProtocol extends JsonUtils { g.writeNumberField("Executor Run Time", taskMetrics.executorRunTime) g.writeNumberField("Executor CPU Time", taskMetrics.executorCpuTime) g.writeNumberField("Peak Execution Memory", taskMetrics.peakExecutionMemory) - g.writeNumberField("Peak On Heap Execution Memory", taskMetrics.peakOnHeapExecutionMemory) - g.writeNumberField("Peak Off Heap Execution Memory", taskMetrics.peakOffHeapExecutionMemory) g.writeNumberField("Result Size", taskMetrics.resultSize) g.writeNumberField("JVM GC Time", taskMetrics.jvmGCTime) g.writeNumberField("Result Serialization Time", taskMetrics.resultSerializationTime) @@ -1256,10 +1254,6 @@ private[spark] object JsonProtocol extends JsonUtils { // The "Peak Execution Memory" field was added in Spark 3.0.0: metrics.setPeakExecutionMemory( jsonOption(json.get("Peak Execution Memory")).map(_.extractLong).getOrElse(0)) - metrics.setPeakOnHeapExecutionMemory( - jsonOption(json.get("Peak On Heap Execution Memory")).map(_.extractLong).getOrElse(0)) - metrics.setPeakOffHeapExecutionMemory( - jsonOption(json.get("Peak Off Heap Execution Memory")).map(_.extractLong).getOrElse(0)) metrics.setResultSize(json.get("Result Size").extractLong) metrics.setJvmGCTime(json.get("JVM GC Time").extractLong) metrics.setResultSerializationTime(json.get("Result Serialization Time").extractLong) diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 1f40ef944a843..a6f1707a1aabf 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -335,24 +335,6 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite { tMemManager.releaseExecutionMemory(500L, c) assert(tMemManager.getMemoryConsumptionForThisTask === 0L) } - - test("task peak execution memory usage") { - val memoryManager = createMemoryManager( - maxOnHeapExecutionMemory = 1000L, - maxOffHeapExecutionMemory = 1000L) - - val tMemManager = new TaskMemoryManager(memoryManager, 1) - val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP) - val onHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.ON_HEAP) - - val result1 = tMemManager.acquireExecutionMemory(500L, offHeapConsumer) - val result2 = tMemManager.acquireExecutionMemory(400L, onHeapConsumer) - assert(result1 === 500L) - assert(result2 === 400L) - assert(tMemManager.getMemoryConsumptionForThisTask === 900L) - assert(tMemManager.getPeakOnHeapExecutionMemory === 400L) - assert(tMemManager.getPeakOffHeapExecutionMemory === 500L) - } } private object MemoryManagerSuite { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index cdee6ccda706e..3eae1b3278e74 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -1453,8 +1453,6 @@ private[spark] object JsonProtocolSuite extends Assertions { t.setExecutorRunTime(b) t.setExecutorCpuTime(b) t.setPeakExecutionMemory(c) - t.setPeakOnHeapExecutionMemory(c) - t.setPeakOffHeapExecutionMemory(c) t.setResultSize(c) t.setJvmGCTime(d) t.setResultSerializationTime(a + b) @@ -1733,8 +1731,6 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Executor Run Time": 400, | "Executor CPU Time": 400, | "Peak Execution Memory": 500, - | "Peak On Heap Execution Memory": 500, - | "Peak Off Heap Execution Memory": 500, | "Result Size": 500, | "JVM GC Time": 600, | "Result Serialization Time": 700, @@ -1876,8 +1872,6 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Executor Run Time": 400, | "Executor CPU Time": 400, | "Peak Execution Memory": 500, - | "Peak On Heap Execution Memory": 500, - | "Peak Off Heap Execution Memory": 500, | "Result Size": 500, | "JVM GC Time": 600, | "Result Serialization Time": 700, @@ -2019,8 +2013,6 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Executor Run Time": 400, | "Executor CPU Time": 400, | "Peak Execution Memory": 500, - | "Peak On Heap Execution Memory": 500, - | "Peak Off Heap Execution Memory": 500, | "Result Size": 500, | "JVM GC Time": 600, | "Result Serialization Time": 700, @@ -2691,20 +2683,6 @@ private[spark] object JsonProtocolSuite extends Assertions { | }, | { | "ID": 10, - | "Name": "$PEAK_ON_HEAP_EXECUTION_MEMORY", - | "Update": 500, - | "Internal": true, - | "Count Failed Values": true - | }, - | { - | "ID": 11, - | "Name": "$PEAK_OFF_HEAP_EXECUTION_MEMORY", - | "Update": 500, - | "Internal": true, - | "Count Failed Values": true - | }, - | { - | "ID": 12, | "Name": "$UPDATED_BLOCK_STATUSES", | "Update": [ | { @@ -2726,175 +2704,175 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Count Failed Values": true | }, | { - | "ID": 13, + | "ID": 11, | "Name": "${shuffleRead.REMOTE_BLOCKS_FETCHED}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 14, + | "ID": 12, | "Name": "${shuffleRead.LOCAL_BLOCKS_FETCHED}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 15, + | "ID": 13, | "Name": "${shuffleRead.REMOTE_BYTES_READ}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 16, + | "ID": 14, | "Name": "${shuffleRead.REMOTE_BYTES_READ_TO_DISK}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 17, + | "ID": 15, | "Name": "${shuffleRead.LOCAL_BYTES_READ}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 18, + | "ID": 16, | "Name": "${shuffleRead.FETCH_WAIT_TIME}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 19, + | "ID": 17, | "Name": "${shuffleRead.RECORDS_READ}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 20, + | "ID": 18, | "Name": "${shuffleRead.CORRUPT_MERGED_BLOCK_CHUNKS}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 21, + | "ID": 19, | "Name": "${shuffleRead.MERGED_FETCH_FALLBACK_COUNT}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID" : 22, + | "ID" : 20, | "Name" : "${shuffleRead.REMOTE_MERGED_BLOCKS_FETCHED}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 23, + | "ID" : 21, | "Name" : "${shuffleRead.LOCAL_MERGED_BLOCKS_FETCHED}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 24, + | "ID" : 22, | "Name" : "${shuffleRead.REMOTE_MERGED_CHUNKS_FETCHED}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 25, + | "ID" : 23, | "Name" : "${shuffleRead.LOCAL_MERGED_CHUNKS_FETCHED}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 26, + | "ID" : 24, | "Name" : "${shuffleRead.REMOTE_MERGED_BYTES_READ}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 27, + | "ID" : 25, | "Name" : "${shuffleRead.LOCAL_MERGED_BYTES_READ}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 28, + | "ID" : 26, | "Name" : "${shuffleRead.REMOTE_REQS_DURATION}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID" : 29, + | "ID" : 27, | "Name" : "${shuffleRead.REMOTE_MERGED_REQS_DURATION}", | "Update" : 0, | "Internal" : true, | "Count Failed Values" : true | }, | { - | "ID": 30, + | "ID": 28, | "Name": "${shuffleWrite.BYTES_WRITTEN}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 31, + | "ID": 29, | "Name": "${shuffleWrite.RECORDS_WRITTEN}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 32, + | "ID": 30, | "Name": "${shuffleWrite.WRITE_TIME}", | "Update": 0, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 33, + | "ID": 31, | "Name": "${input.BYTES_READ}", | "Update": 2100, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 34, + | "ID": 32, | "Name": "${input.RECORDS_READ}", | "Update": 21, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 35, + | "ID": 33, | "Name": "${output.BYTES_WRITTEN}", | "Update": 1200, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 36, + | "ID": 34, | "Name": "${output.RECORDS_WRITTEN}", | "Update": 12, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 37, + | "ID": 35, | "Name": "$TEST_ACCUM", | "Update": 0, | "Internal": true,