Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,6 @@ public class TaskMemoryManager {
*/
private volatile long acquiredButNotUsed = 0L;

/**
* Peak off heap memory usage by this task.
*/
private volatile long peakOffHeapMemory = 0L;

/**
* Peak on heap memory usage by this task.
*/
private volatile long peakOnHeapMemory = 0L;

/**
* Construct a new TaskMemoryManager.
*/
Expand Down Expand Up @@ -212,17 +202,6 @@ public long acquireExecutionMemory(long required, MemoryConsumer requestingConsu
logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
requestingConsumer);
}

// Consumer will update its used memory after acquireExecutionMemory, so we need to add `got`
// to compute current peak
long currentPeak = consumers.stream().filter(c -> c.getMode() == mode)
.mapToLong(MemoryConsumer::getUsed).sum() + got;
if (mode == MemoryMode.OFF_HEAP) {
peakOffHeapMemory = Math.max(peakOffHeapMemory, currentPeak);
} else {
peakOnHeapMemory = Math.max(peakOnHeapMemory, currentPeak);
}

return got;
}
}
Expand Down Expand Up @@ -528,19 +507,4 @@ public long getMemoryConsumptionForThisTask() {
public MemoryMode getTungstenMemoryMode() {
return tungstenMemoryMode;
}

/**
* Returns peak task-level off-heap memory usage in bytes.
*
*/
public long getPeakOnHeapExecutionMemory() {
return peakOnHeapMemory;
}

/**
* Returns peak task-level on-heap memory usage in bytes.
*/
public long getPeakOffHeapExecutionMemory() {
return peakOffHeapMemory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ private[spark] object InternalAccumulator {
val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled"
val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled"
val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory"
val PEAK_ON_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOnHeapExecutionMemory"
val PEAK_OFF_HEAP_EXECUTION_MEMORY = METRICS_PREFIX + "peakOffHeapExecutionMemory"
val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses"
val TEST_ACCUM = METRICS_PREFIX + "testAccumulator"

Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,6 @@ private[spark] class Executor(
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.metrics.setResultSerializationTime(TimeUnit.NANOSECONDS.toMillis(
afterSerializationNs - beforeSerializationNs))
task.metrics.setPeakOnHeapExecutionMemory(taskMemoryManager.getPeakOnHeapExecutionMemory)
task.metrics.setPeakOffHeapExecutionMemory(taskMemoryManager.getPeakOffHeapExecutionMemory)
// Expose task metrics using the Dropwizard metrics system.
// Update task metrics counters
executorSource.METRIC_CPU_TIME.inc(task.metrics.executorCpuTime)
Expand Down
21 changes: 0 additions & 21 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ class TaskMetrics private[spark] () extends Serializable {
private val _memoryBytesSpilled = new LongAccumulator
private val _diskBytesSpilled = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
private val _peakOnHeapExecutionMemory = new LongAccumulator
private val _peakOffHeapExecutionMemory = new LongAccumulator
private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]

/**
Expand Down Expand Up @@ -111,22 +109,9 @@ class TaskMetrics private[spark] () extends Serializable {
* joins. The value of this accumulator should be approximately the sum of the peak sizes
* across all such data structures created in this task. For SQL jobs, this only tracks all
* unsafe operators and ExternalSort.
* This is not equal to peakOnHeapExecutionMemory + peakOffHeapExecutionMemory
*/
// TODO: SPARK-48789: the naming is confusing since this does not really reflect the whole
// execution memory. We'd better deprecate this once we have a replacement.
def peakExecutionMemory: Long = _peakExecutionMemory.sum

/**
* Peak on heap execution memory as tracked by TaskMemoryManager.
*/
def peakOnHeapExecutionMemory: Long = _peakOnHeapExecutionMemory.sum

/**
* Peak off heap execution memory as tracked by TaskMemoryManager.
*/
def peakOffHeapExecutionMemory: Long = _peakOffHeapExecutionMemory.sum

/**
* Storage statuses of any blocks that have been updated as a result of this task.
*
Expand Down Expand Up @@ -154,10 +139,6 @@ class TaskMetrics private[spark] () extends Serializable {
private[spark] def setResultSerializationTime(v: Long): Unit =
_resultSerializationTime.setValue(v)
private[spark] def setPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.setValue(v)
private[spark] def setPeakOnHeapExecutionMemory(v: Long): Unit =
_peakOnHeapExecutionMemory.setValue(v)
private[spark] def setPeakOffHeapExecutionMemory(v: Long): Unit =
_peakOffHeapExecutionMemory.setValue(v)
private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v)
private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v)
private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)
Expand Down Expand Up @@ -244,8 +225,6 @@ class TaskMetrics private[spark] () extends Serializable {
MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
DISK_BYTES_SPILLED -> _diskBytesSpilled,
PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
PEAK_ON_HEAP_EXECUTION_MEMORY -> _peakOnHeapExecutionMemory,
PEAK_OFF_HEAP_EXECUTION_MEMORY -> _peakOffHeapExecutionMemory,
UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
Expand Down
6 changes: 0 additions & 6 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,6 @@ private[spark] object JsonProtocol extends JsonUtils {
g.writeNumberField("Executor Run Time", taskMetrics.executorRunTime)
g.writeNumberField("Executor CPU Time", taskMetrics.executorCpuTime)
g.writeNumberField("Peak Execution Memory", taskMetrics.peakExecutionMemory)
g.writeNumberField("Peak On Heap Execution Memory", taskMetrics.peakOnHeapExecutionMemory)
g.writeNumberField("Peak Off Heap Execution Memory", taskMetrics.peakOffHeapExecutionMemory)
g.writeNumberField("Result Size", taskMetrics.resultSize)
g.writeNumberField("JVM GC Time", taskMetrics.jvmGCTime)
g.writeNumberField("Result Serialization Time", taskMetrics.resultSerializationTime)
Expand Down Expand Up @@ -1256,10 +1254,6 @@ private[spark] object JsonProtocol extends JsonUtils {
// The "Peak Execution Memory" field was added in Spark 3.0.0:
metrics.setPeakExecutionMemory(
jsonOption(json.get("Peak Execution Memory")).map(_.extractLong).getOrElse(0))
metrics.setPeakOnHeapExecutionMemory(
jsonOption(json.get("Peak On Heap Execution Memory")).map(_.extractLong).getOrElse(0))
metrics.setPeakOffHeapExecutionMemory(
jsonOption(json.get("Peak Off Heap Execution Memory")).map(_.extractLong).getOrElse(0))
metrics.setResultSize(json.get("Result Size").extractLong)
metrics.setJvmGCTime(json.get("JVM GC Time").extractLong)
metrics.setResultSerializationTime(json.get("Result Serialization Time").extractLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,24 +335,6 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
tMemManager.releaseExecutionMemory(500L, c)
assert(tMemManager.getMemoryConsumptionForThisTask === 0L)
}

test("task peak execution memory usage") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 1000L,
maxOffHeapExecutionMemory = 1000L)

val tMemManager = new TaskMemoryManager(memoryManager, 1)
val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
val onHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.ON_HEAP)

val result1 = tMemManager.acquireExecutionMemory(500L, offHeapConsumer)
val result2 = tMemManager.acquireExecutionMemory(400L, onHeapConsumer)
assert(result1 === 500L)
assert(result2 === 400L)
assert(tMemManager.getMemoryConsumptionForThisTask === 900L)
assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
assert(tMemManager.getPeakOffHeapExecutionMemory === 500L)
}
}

private object MemoryManagerSuite {
Expand Down
Loading