Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ message TaskDataWrapper {
int64 shuffle_merged_local_bytes_read = 49;
int64 shuffle_remote_reqs_duration = 50;
int64 shuffle_merged_remote_req_duration = 51;
int64 peak_on_heap_execution_memory = 52;
int64 peak_off_heap_execution_memory = 53;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add to ExecutorStageSummary, ExecutorSummary, ExecutorMetricsDistributions as well.
(Here and other model classes)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we already have executor level stage memory metrics:

  • ExecutorStageSummary has val peakMemoryMetrics: Option[ExecutorMetrics]
  • ExecutorSummary has val memoryMetrics: Option[MemoryMetrics]
  • ExecutorMetricsDistributions has val peakMemoryMetrics: ExecutorPeakMetricsDistributions

which are aggregated through AppStatusListener.updateStageLevelPeakExecutorMetrics

}

message ExecutorMetrics {
Expand Down Expand Up @@ -323,6 +325,8 @@ message CachedQuantile {
double shuffle_merged_local_bytes_read = 40;
double shuffle_remote_reqs_duration = 41;
double shuffle_merged_remote_reqs_duration = 42;
double peak_on_heap_execution_memory = 43;
double peak_off_heap_execution_memory = 44;
}

message SpeculationStageSummary {
Expand Down Expand Up @@ -611,6 +615,8 @@ message StageData {
int64 shuffle_merged_remote_reqs_duration = 62;
bool is_shuffle_push_enabled = 63;
int32 shuffle_mergers_count = 64;
int64 peak_on_heap_execution_memory = 65;
int64 peak_off_heap_execution_memory = 66;
}

message TaskMetrics {
Expand All @@ -628,6 +634,8 @@ message TaskMetrics {
OutputMetrics output_metrics = 12;
ShuffleReadMetrics shuffle_read_metrics = 13;
ShuffleWriteMetrics shuffle_write_metrics = 14;
int64 peak_on_heap_execution_memory = 15;
int64 peak_off_heap_execution_memory = 16;
}

message InputMetrics {
Expand Down Expand Up @@ -689,6 +697,8 @@ message TaskMetricDistributions {
OutputMetricDistributions output_metrics = 16;
ShuffleReadMetricDistributions shuffle_read_metrics = 17;
ShuffleWriteMetricDistributions shuffle_write_metrics = 18;
repeated double peak_on_heap_execution_memory = 19;
repeated double peak_off_heap_execution_memory = 20;
}

message InputMetricDistributions {
Expand Down
51 changes: 49 additions & 2 deletions core/src/main/resources/org/apache/spark/ui/static/stagepage.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ function getColumnNameForTaskMetricSummary(columnKey) {
case "peakExecutionMemory":
return "Peak Execution Memory";

case "peakOnHeapExecutionMemory":
return "Peak OnHeap Execution Memory";

case "peakOffHeapExecutionMemory":
return "Peak OffHeap Execution Memory";

case "resultSerializationTime":
return "Result Serialization Time";

Expand Down Expand Up @@ -201,7 +207,8 @@ function displayRowsForSummaryMetricsTable(row, type, columnIndex) {

default:
return (row.columnKey == 'peakExecutionMemory' || row.columnKey == 'memoryBytesSpilled'
|| row.columnKey == 'diskBytesSpilled') ? formatBytes(
|| row.columnKey == 'diskBytesSpilled' || row.columnKey == 'peakOnHeapExecutionMemory'
|| row.columnKey == 'peakOffHeapExecutionMemory') ? formatBytes(
row.data[columnIndex], type) : (formatDuration(row.data[columnIndex]));

}
Expand Down Expand Up @@ -326,7 +333,7 @@ function getStageAttemptId() {
var taskSummaryMetricsTableArray = [];
var taskSummaryMetricsTableCurrentStateArray = [];
var taskSummaryMetricsDataTable;
var optionalColumns = [11, 12, 13, 14, 15, 16, 17, 21];
var optionalColumns = [11, 12, 13, 14, 15, 16, 17, 21, 26, 27];
var taskTableSelector;

var executorOptionalColumns = [15, 16, 17, 18];
Expand All @@ -350,6 +357,8 @@ $(document).ready(function () {
"<div id='result_serialization_time' class='result-serialization-time-checkbox-div'><input type='checkbox' class='toggle-vis' id='box-15' data-column='15' data-metrics-type='task'> Result Serialization Time</div>" +
"<div id='getting_result_time' class='getting-result-time-checkbox-div'><input type='checkbox' class='toggle-vis' id='box-16' data-column='16' data-metrics-type='task'> Getting Result Time</div>" +
"<div id='peak_execution_memory' class='peak-execution-memory-checkbox-div'><input type='checkbox' class='toggle-vis' id='box-17' data-column='17' data-metrics-type='task'> Peak Execution Memory</div>" +
"<div id='peak_onheap_execution_memory' class='peak-onheap-execution-memory-checkbox-div'><input type='checkbox' class='toggle-vis' id='box-26' data-column='26' data-metrics-type='task'> Peak OnHeap Execution Memory</div>" +
"<div id='peak_offheap_execution_memory' class='peak-offheap-execution-memory-checkbox-div'><input type='checkbox' class='toggle-vis' id='box-27' data-column='27' data-metrics-type='task'> Peak OffHeap Execution Memory</div>" +
"<div id='executor_jvm_on_off_heap_memory' class='executor-jvm-metrics-checkbox-div'><input type='checkbox' class='toggle-vis' id='executor-box-15' data-column='15' data-metrics-type='executor'> Peak JVM Memory OnHeap / OffHeap</div>" +
"<div id='executor_on_off_heap_execution_memory' class='executor-jvm-metrics-checkbox-div'><input type='checkbox' class='toggle-vis' id='executor-box-16' data-column='16' data-metrics-type='executor'> Peak Execution Memory OnHeap / OffHeap</div>" +
"<div id='executor_on_off_heap_storage_memory' class='executor-jvm-metrics-checkbox-div'><input type='checkbox' class='toggle-vis' id='executor-box-17' data-column='17' data-metrics-type='executor'> Peak Storage Memory OnHeap / OffHeap</div>" +
Expand Down Expand Up @@ -385,6 +394,12 @@ $(document).ready(function () {
"should be approximately the sum of the peak sizes across all such data structures created " +
"in this task. For SQL jobs, this only tracks all unsafe operators, broadcast joins, and " +
"external sort.");
$('#peak_onheap_execution_memory').attr("data-toggle", "tooltip")
.attr("data-placement", "top")
.attr("title", "Peak OnHeap Execution memory of this stage");
$('#peak_offheap_execution_memory').attr("data-toggle", "tooltip")
.attr("data-placement", "top")
.attr("title", "Peak OffHeap Execution memory of this stage");
$('[data-toggle="tooltip"]').tooltip();
var tasksSummary = $("#parent-container");
getStandAloneAppId(function (appId) {
Expand Down Expand Up @@ -747,6 +762,18 @@ $(document).ready(function () {
taskSummaryMetricsTableArray.push(row);
break;

case "peakOnHeapExecutionMemory":
row = createRowMetadataForColumn(
columnKey, taskMetricsResponse[columnKey], 26);
taskSummaryMetricsTableArray.push(row);
break;

case "peakOffHeapExecutionMemory":
row = createRowMetadataForColumn(
columnKey, taskMetricsResponse[columnKey], 27);
taskSummaryMetricsTableArray.push(row);
break;

case "inputMetrics":
row = createRowMetadataForColumn(
columnKey, taskMetricsResponse[columnKey], 1);
Expand Down Expand Up @@ -1069,6 +1096,26 @@ $(document).ready(function () {
},
name: "Spill (Disk)"
},
{
data : function (row, type) {
if (row.taskMetrics && row.taskMetrics.peakOnHeapExecutionMemory) {
return type === 'display' ? formatBytes(row.taskMetrics.peakOnHeapExecutionMemory, type) : row.taskMetrics.peakOnHeapExecutionMemory;
} else {
return "";
}
},
name: "Peak OnHeap Execution Memory"
},
{
data : function (row, type) {
if (row.taskMetrics && row.taskMetrics.peakOffHeapExecutionMemory) {
return type === 'display' ? formatBytes(row.taskMetrics.peakOffHeapExecutionMemory, type) : row.taskMetrics.peakOffHeapExecutionMemory;
} else {
return "";
}
},
name: "Peak OffHeap Execution Memory"
},
{
data : function (row, _ignored_type) {
var msg = row.errorMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ <h4 id="tasksTitle" class="title-table"></h4>
<th>Shuffle Read Size / Records</th>
<th>Spill (Memory)</th>
<th>Spill (Disk)</th>
<th>Peak OnHeap Execution Memory</th>
<th>Peak OffHeap Execution Memory</th>
<th>Errors</th>
</tr>
</thead>
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ private[spark] class AppStatusStore(
gettingResultTime = toValues(_.gettingResultTime),
schedulerDelay = toValues(_.schedulerDelay),
peakExecutionMemory = toValues(_.peakExecutionMemory),
peakOnHeapExecutionMemory = toValues(_.peakOnHeapExecutionMemory),
peakOffHeapExecutionMemory = toValues(_.peakOffHeapExecutionMemory),
memoryBytesSpilled = toValues(_.memoryBytesSpilled),
diskBytesSpilled = toValues(_.diskBytesSpilled),
inputMetrics = new v1.InputMetricDistributions(
Expand Down Expand Up @@ -399,6 +401,10 @@ private[spark] class AppStatusStore(
},
schedulerDelay = scanTasks(TaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay },
peakExecutionMemory = scanTasks(TaskIndexNames.PEAK_MEM) { t => t.peakExecutionMemory },
peakOnHeapExecutionMemory =
scanTasks(TaskIndexNames.PEAK_ON_HEAP_MEM) { t => t.peakOnHeapExecutionMemory },
peakOffHeapExecutionMemory =
scanTasks(TaskIndexNames.PEAK_OFF_HEAP_MEM) { t => t.peakOffHeapExecutionMemory },
memoryBytesSpilled = scanTasks(TaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled },
diskBytesSpilled = scanTasks(TaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled },
inputMetrics = new v1.InputMetricDistributions(
Expand Down Expand Up @@ -474,6 +480,8 @@ private[spark] class AppStatusStore(
gettingResultTime = computedQuantiles.gettingResultTime(idx),
schedulerDelay = computedQuantiles.schedulerDelay(idx),
peakExecutionMemory = computedQuantiles.peakExecutionMemory(idx),
peakOnHeapExecutionMemory = computedQuantiles.peakOnHeapExecutionMemory(idx),
peakOffHeapExecutionMemory = computedQuantiles.peakOffHeapExecutionMemory(idx),
memoryBytesSpilled = computedQuantiles.memoryBytesSpilled(idx),
diskBytesSpilled = computedQuantiles.diskBytesSpilled(idx),

Expand Down Expand Up @@ -692,6 +700,8 @@ private[spark] class AppStatusStore(
memoryBytesSpilled = stage.memoryBytesSpilled,
diskBytesSpilled = stage.diskBytesSpilled,
peakExecutionMemory = stage.peakExecutionMemory,
peakOnHeapExecutionMemory = stage.peakOnHeapExecutionMemory,
peakOffHeapExecutionMemory = stage.peakOffHeapExecutionMemory,
inputBytes = stage.inputBytes,
inputRecords = stage.inputRecords,
outputBytes = stage.outputBytes,
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ private class LiveTask(
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
metrics.peakExecutionMemory,
metrics.peakOnHeapExecutionMemory,
metrics.peakOffHeapExecutionMemory,
metrics.inputMetrics.bytesRead,
metrics.inputMetrics.recordsRead,
metrics.outputMetrics.bytesWritten,
Expand Down Expand Up @@ -236,6 +238,8 @@ private class LiveTask(
taskMetrics.memoryBytesSpilled,
taskMetrics.diskBytesSpilled,
taskMetrics.peakExecutionMemory,
taskMetrics.peakOnHeapExecutionMemory,
taskMetrics.peakOffHeapExecutionMemory,
taskMetrics.inputMetrics.bytesRead,
taskMetrics.inputMetrics.recordsRead,
taskMetrics.outputMetrics.bytesWritten,
Expand Down Expand Up @@ -511,6 +515,8 @@ private class LiveStage(var info: StageInfo) extends LiveEntity {
memoryBytesSpilled = metrics.memoryBytesSpilled,
diskBytesSpilled = metrics.diskBytesSpilled,
peakExecutionMemory = metrics.peakExecutionMemory,
peakOnHeapExecutionMemory = metrics.peakOnHeapExecutionMemory,
peakOffHeapExecutionMemory = metrics.peakOffHeapExecutionMemory,
inputBytes = metrics.inputMetrics.bytesRead,
inputRecords = metrics.inputMetrics.recordsRead,
outputBytes = metrics.outputMetrics.bytesWritten,
Expand Down Expand Up @@ -762,6 +768,8 @@ private[spark] object LiveEntityHelpers {
memoryBytesSpilled: Long,
diskBytesSpilled: Long,
peakExecutionMemory: Long,
peakOnHeapExecutionMemory: Long,
peakOffHeapExecutionMemory: Long,
inputBytesRead: Long,
inputRecordsRead: Long,
outputBytesWritten: Long,
Expand Down Expand Up @@ -797,6 +805,8 @@ private[spark] object LiveEntityHelpers {
memoryBytesSpilled,
diskBytesSpilled,
peakExecutionMemory,
peakOnHeapExecutionMemory,
peakOffHeapExecutionMemory,
new v1.InputMetrics(
inputBytesRead,
inputRecordsRead),
Expand Down Expand Up @@ -834,7 +844,7 @@ private[spark] object LiveEntityHelpers {
createMetrics(default, default, default, default, default, default, default, default,
default, default, default, default, default, default, default, default, default,
default, default, default, default, default, default, default, default, default,
default, default, default, default, default, default, default, default)
default, default, default, default, default, default, default, default, default, default)
}

/** Add m2 values to m1. */
Expand Down Expand Up @@ -869,6 +879,8 @@ private[spark] object LiveEntityHelpers {
updateMetricValue(m.memoryBytesSpilled),
updateMetricValue(m.diskBytesSpilled),
updateMetricValue(m.peakExecutionMemory),
updateMetricValue(m.peakOnHeapExecutionMemory),
updateMetricValue(m.peakOffHeapExecutionMemory),
updateMetricValue(m.inputMetrics.bytesRead),
updateMetricValue(m.inputMetrics.recordsRead),
updateMetricValue(m.shuffleReadMetrics.shufflePushReadMetrics.corruptMergedBlockChunks),
Expand Down Expand Up @@ -907,6 +919,8 @@ private[spark] object LiveEntityHelpers {
m1.memoryBytesSpilled + m2.memoryBytesSpilled * mult,
m1.diskBytesSpilled + m2.diskBytesSpilled * mult,
m1.peakExecutionMemory + m2.peakExecutionMemory * mult,
m1.peakOnHeapExecutionMemory + m2.peakOnHeapExecutionMemory * mult,
m1.peakOffHeapExecutionMemory + m2.peakOffHeapExecutionMemory * mult,
m1.inputMetrics.bytesRead + m2.inputMetrics.bytesRead * mult,
m1.inputMetrics.recordsRead + m2.inputMetrics.recordsRead * mult,
m1.outputMetrics.bytesWritten + m2.outputMetrics.bytesWritten * mult,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ private[v1] class StagesResource extends BaseAppResource {
|| containsValue(Utils.bytesToString(task.taskMetrics.get.memoryBytesSpilled))
|| containsValue(Utils.bytesToString(task.taskMetrics.get.diskBytesSpilled))
|| containsValue(Utils.bytesToString(task.taskMetrics.get.peakExecutionMemory))
|| containsValue(Utils.bytesToString(task.taskMetrics.get.peakOnHeapExecutionMemory))
|| containsValue(Utils.bytesToString(task.taskMetrics.get.peakOffHeapExecutionMemory))
|| containsValue(Utils.bytesToString(task.taskMetrics.get.inputMetrics.bytesRead))
|| containsValue(task.taskMetrics.get.inputMetrics.recordsRead)
|| containsValue(Utils.bytesToString(
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ class StageData private[spark](
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,
val peakExecutionMemory: Long,
val peakOnHeapExecutionMemory: Long,
val peakOffHeapExecutionMemory: Long,
val inputBytes: Long,
val inputRecords: Long,
val outputBytes: Long,
Expand Down Expand Up @@ -350,6 +352,8 @@ class TaskMetrics private[spark](
val memoryBytesSpilled: Long,
val diskBytesSpilled: Long,
val peakExecutionMemory: Long,
val peakOnHeapExecutionMemory: Long,
val peakOffHeapExecutionMemory: Long,
val inputMetrics: InputMetrics,
val outputMetrics: OutputMetrics,
val shuffleReadMetrics: ShuffleReadMetrics,
Expand Down Expand Up @@ -404,6 +408,8 @@ class TaskMetricDistributions private[spark](
val gettingResultTime: IndexedSeq[Double],
val schedulerDelay: IndexedSeq[Double],
val peakExecutionMemory: IndexedSeq[Double],
val peakOnHeapExecutionMemory: IndexedSeq[Double],
val peakOffHeapExecutionMemory: IndexedSeq[Double],
val memoryBytesSpilled: IndexedSeq[Double],
val diskBytesSpilled: IndexedSeq[Double],

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ private[protobuf] class CachedQuantileSerializer extends ProtobufSerDe[CachedQua
.setGettingResultTime(data.gettingResultTime)
.setSchedulerDelay(data.schedulerDelay)
.setPeakExecutionMemory(data.peakExecutionMemory)
.setPeakOnHeapExecutionMemory(data.peakOnHeapExecutionMemory)
.setPeakOffHeapExecutionMemory(data.peakOffHeapExecutionMemory)
.setMemoryBytesSpilled(data.memoryBytesSpilled)
.setDiskBytesSpilled(data.diskBytesSpilled)
.setBytesRead(data.bytesRead)
Expand Down Expand Up @@ -87,6 +89,8 @@ private[protobuf] class CachedQuantileSerializer extends ProtobufSerDe[CachedQua
gettingResultTime = binary.getGettingResultTime,
schedulerDelay = binary.getSchedulerDelay,
peakExecutionMemory = binary.getPeakExecutionMemory,
peakOnHeapExecutionMemory = binary.getPeakOnHeapExecutionMemory,
peakOffHeapExecutionMemory = binary.getPeakOffHeapExecutionMemory,
memoryBytesSpilled = binary.getMemoryBytesSpilled,
diskBytesSpilled = binary.getDiskBytesSpilled,
bytesRead = binary.getBytesRead,
Expand Down
Loading