From aadd699cd5ce2e100f4e258651e2e7a7cfb8b890 Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Thu, 13 Dec 2018 00:55:33 -0500 Subject: [PATCH 1/6] Exposing Procfsmetrics to metric system --- .../spark/executor/ProcfsMetricsGetter.scala | 16 ++++- .../spark/executor/ProcfsMetricsSource.scala | 64 +++++++++++++++++++ .../spark/internal/config/package.scala | 5 ++ .../spark/metrics/ExecutorMetricType.scala | 23 ++++--- 4 files changed, 98 insertions(+), 10 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala index af67f41e94af1..91ae8f6f6e4bc 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala @@ -62,7 +62,21 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) val shouldLogStageExecutorProcessTreeMetrics = SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) - procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics + val shouldAddProcessTreeMetricsToMetricsSet = + SparkEnv.get.conf.get(config.METRICS_PROCESS_TREE_METRICS) + val pickEitherUIOrMetricsSet = shouldLogStageExecutorProcessTreeMetrics ^ + shouldAddProcessTreeMetricsToMetricsSet + val areBothUIMetricsEnabled = shouldLogStageExecutorProcessTreeMetrics && + shouldAddProcessTreeMetricsToMetricsSet + if (areBothUIMetricsEnabled) { + logWarning("You have enabled " + + "both spark.eventLog.logStageExecutorProcessTreeMetrics.enabled" + + " and spark.metrics.logStageExecutorProcessTreeMetrics.enabled. This isn't " + + "allowed. As a result Procfs metrics won't be reported to UI or Metricsset") + } + (procDirExists.get && shouldLogStageExecutorMetrics && pickEitherUIOrMetricsSet) || + (procDirExists.get && !shouldLogStageExecutorMetrics && + pickEitherUIOrMetricsSet && shouldAddProcessTreeMetricsToMetricsSet) } } diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala new file mode 100644 index 0000000000000..bacb1143280d9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import org.apache.spark.internal.config +import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkEnv + +private[executor] class ProcfsMetricsSource extends Source { + override val sourceName = "procfs" + override val metricRegistry = new MetricRegistry() + var numMetrics: Int = 0 + var metrics: Map[String, Long] = Map.empty + val shouldAddProcessTreeMetricsToMetricsSet = + SparkEnv.get.conf.get(config.METRICS_PROCESS_TREE_METRICS) + + private def getProcfsMetrics: Map[String, Long] = { + if (numMetrics == 0) { + metrics = Map.empty + val p = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics() + metrics = Map("ProcessTreeJVMVMemory" -> p.jvmVmemTotal, + "ProcessTreeJVMRSSMemory" -> p.jvmRSSTotal, + "ProcessTreePythonVMemory" -> p.pythonVmemTotal, + "ProcessTreePythonRSSMemory" -> p.pythonRSSTotal, + "ProcessTreeOtherVMemory" -> p.otherVmemTotal, + "ProcessTreeOtherRSSMemory" -> p.otherRSSTotal) + } + numMetrics = numMetrics + 1 + if (numMetrics == 6) { + numMetrics = 0} + metrics + } + private def registerProcfsMetrics[Long]( name: String) = { + metricRegistry.register(MetricRegistry.name("processTree", name), new Gauge[Long] { + override def getValue: Long = getProcfsMetrics(name).asInstanceOf[Long] + }) + } + + if (shouldAddProcessTreeMetricsToMetricsSet) { + registerProcfsMetrics("ProcessTreeJVMVMemory") + registerProcfsMetrics("ProcessTreeJVMRSSMemory") + registerProcfsMetrics("ProcessTreePythonVMemory") + registerProcfsMetrics("ProcessTreePythonRSSMemory") + registerProcfsMetrics("ProcessTreeOtherVMemory") + registerProcfsMetrics("ProcessTreeOtherRSSMemory") + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 85bb557abef5d..1f7cc6fdbcdef 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -98,6 +98,11 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val METRICS_PROCESS_TREE_METRICS = + ConfigBuilder("spark.metrics.logStageExecutorProcessTreeMetrics.enabled") + .booleanConf + .createWithDefault(false) + private[spark] val EVENT_LOG_OVERWRITE = ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala index 704b36d3118b7..0b71f0d157ab7 100644 --- a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala +++ b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala @@ -22,7 +22,10 @@ import javax.management.ObjectName import scala.collection.mutable import org.apache.spark.executor.ProcfsMetricsGetter +import org.apache.spark.internal.config import org.apache.spark.memory.MemoryManager +import org.apache.spark.SparkEnv + /** * Executor metric types for executor-level metrics stored in ExecutorMetrics. @@ -85,16 +88,19 @@ case object ProcessTreeMetrics extends ExecutorMetricType { "ProcessTreePythonRSSMemory", "ProcessTreeOtherVMemory", "ProcessTreeOtherRSSMemory") - + val shouldLogStageExecutorProcessTreeMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) override private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = { - val allMetrics = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics() val processTreeMetrics = new Array[Long](names.length) - processTreeMetrics(0) = allMetrics.jvmVmemTotal - processTreeMetrics(1) = allMetrics.jvmRSSTotal - processTreeMetrics(2) = allMetrics.pythonVmemTotal - processTreeMetrics(3) = allMetrics.pythonRSSTotal - processTreeMetrics(4) = allMetrics.otherVmemTotal - processTreeMetrics(5) = allMetrics.otherRSSTotal + if (shouldLogStageExecutorProcessTreeMetrics) { + val allMetrics = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics() + processTreeMetrics(0) = allMetrics.jvmVmemTotal + processTreeMetrics(1) = allMetrics.jvmRSSTotal + processTreeMetrics(2) = allMetrics.pythonVmemTotal + processTreeMetrics(3) = allMetrics.pythonRSSTotal + processTreeMetrics(4) = allMetrics.otherVmemTotal + processTreeMetrics(5) = allMetrics.otherRSSTotal + } processTreeMetrics } } @@ -140,7 +146,6 @@ private[spark] object ExecutorMetricType { ProcessTreeMetrics ) - val (metricToOffset, numMetrics) = { var numberOfMetrics = 0 val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int] From e01779cff25f5f465350f076d9d05c61e4931e71 Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Thu, 13 Dec 2018 11:53:48 -0500 Subject: [PATCH 2/6] Fix the style issues and add a cache for metrics --- .../spark/executor/ProcfsMetricsGetter.scala | 67 +++++++++++-------- .../spark/executor/ProcfsMetricsSource.scala | 36 +++++----- .../spark/internal/config/package.scala | 2 +- .../spark/metrics/ExecutorMetricType.scala | 2 +- .../executor/ProcfsMetricsGetterSuite.scala | 2 +- 5 files changed, 63 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala index 91ae8f6f6e4bc..b7db39811ff1f 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala @@ -37,7 +37,9 @@ private[spark] case class ProcfsMetrics( pythonVmemTotal: Long, pythonRSSTotal: Long, otherVmemTotal: Long, - otherRSSTotal: Long) + otherRSSTotal: Long, + timeStamp: Long) + // Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop // project. @@ -47,6 +49,14 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L private val pageSize = computePageSize() private var isAvailable: Boolean = isProcfsAvailable private val pid = computePid() + var cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0, 0) + private val HEARTBEAT_INTERVAL_MS = if (testing) { + 0 + } else { + SparkEnv.get.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL) + } + + private lazy val isProcfsAvailable: Boolean = { if (testing) { @@ -64,19 +74,11 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) val shouldAddProcessTreeMetricsToMetricsSet = SparkEnv.get.conf.get(config.METRICS_PROCESS_TREE_METRICS) - val pickEitherUIOrMetricsSet = shouldLogStageExecutorProcessTreeMetrics ^ - shouldAddProcessTreeMetricsToMetricsSet - val areBothUIMetricsEnabled = shouldLogStageExecutorProcessTreeMetrics && + val pickAnyOfUIOrMetricsSet = shouldLogStageExecutorProcessTreeMetrics || shouldAddProcessTreeMetricsToMetricsSet - if (areBothUIMetricsEnabled) { - logWarning("You have enabled " + - "both spark.eventLog.logStageExecutorProcessTreeMetrics.enabled" + - " and spark.metrics.logStageExecutorProcessTreeMetrics.enabled. This isn't " + - "allowed. As a result Procfs metrics won't be reported to UI or Metricsset") - } - (procDirExists.get && shouldLogStageExecutorMetrics && pickEitherUIOrMetricsSet) || - (procDirExists.get && !shouldLogStageExecutorMetrics && - pickEitherUIOrMetricsSet && shouldAddProcessTreeMetricsToMetricsSet) + + (procDirExists.get && shouldLogStageExecutorMetrics && pickAnyOfUIOrMetricsSet) || + (procDirExists.get && shouldAddProcessTreeMetricsToMetricsSet) } } @@ -195,19 +197,22 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) { allMetrics.copy( jvmVmemTotal = allMetrics.jvmVmemTotal + vmem, - jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem) + jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem), + timeStamp = System.currentTimeMillis ) } else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) { allMetrics.copy( pythonVmemTotal = allMetrics.pythonVmemTotal + vmem, - pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem) + pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem), + timeStamp = System.currentTimeMillis ) } else { allMetrics.copy( otherVmemTotal = allMetrics.otherVmemTotal + vmem, - otherRSSTotal = allMetrics.otherRSSTotal + (rssMem) + otherRSSTotal = allMetrics.otherRSSTotal + (rssMem), + timeStamp = System.currentTimeMillis ) } } @@ -215,25 +220,33 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L case f: IOException => logWarning("There was a problem with reading" + " the stat file of the process. ", f) - ProcfsMetrics(0, 0, 0, 0, 0, 0) + ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis) } } private[spark] def computeAllMetrics(): ProcfsMetrics = { if (!isAvailable) { - return ProcfsMetrics(0, 0, 0, 0, 0, 0) + return ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis) } - val pids = computeProcessTree - var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0) - for (p <- pids) { - allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p) - // if we had an error getting any of the metrics, we don't want to report partial metrics, as - // that would be misleading. - if (!isAvailable) { - return ProcfsMetrics(0, 0, 0, 0, 0, 0) + val lastMetricComputation = System.currentTimeMillis() - cachedAllMetric.timeStamp + // Check whether we have computed the metrics in the past 1s + // ToDo: Should we make this configurable? + if(lastMetricComputation > Math.min(1000, HEARTBEAT_INTERVAL_MS)) { + val pids = computeProcessTree + var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis) + for (p <- pids) { + allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p) + // if we had an error getting any of the metrics, we don't + // want to report partial metrics, as that would be misleading. + if (!isAvailable) { + return ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis) + } } + allMetrics + } + else { + cachedAllMetric } - allMetrics } } diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala index bacb1143280d9..051c72f94c8b8 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala @@ -19,14 +19,15 @@ package org.apache.spark.executor import com.codahale.metrics.{Gauge, MetricRegistry} +import org.apache.spark.SparkEnv import org.apache.spark.internal.config import org.apache.spark.metrics.source.Source -import org.apache.spark.SparkEnv private[executor] class ProcfsMetricsSource extends Source { override val sourceName = "procfs" - override val metricRegistry = new MetricRegistry() + // We use numMetrics for tracking to only call computAllMetrics once per set of metrics var numMetrics: Int = 0 + override val metricRegistry = new MetricRegistry() var metrics: Map[String, Long] = Map.empty val shouldAddProcessTreeMetricsToMetricsSet = SparkEnv.get.conf.get(config.METRICS_PROCESS_TREE_METRICS) @@ -35,30 +36,33 @@ private[executor] class ProcfsMetricsSource extends Source { if (numMetrics == 0) { metrics = Map.empty val p = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics() - metrics = Map("ProcessTreeJVMVMemory" -> p.jvmVmemTotal, - "ProcessTreeJVMRSSMemory" -> p.jvmRSSTotal, - "ProcessTreePythonVMemory" -> p.pythonVmemTotal, - "ProcessTreePythonRSSMemory" -> p.pythonRSSTotal, - "ProcessTreeOtherVMemory" -> p.otherVmemTotal, - "ProcessTreeOtherRSSMemory" -> p.otherRSSTotal) + metrics = Map( + "JVMVMemory" -> p.jvmVmemTotal, + "JVMRSSMemory" -> p.jvmRSSTotal, + "PythonVMemory" -> p.pythonVmemTotal, + "PythonRSSMemory" -> p.pythonRSSTotal, + "OtherVMemory" -> p.otherVmemTotal, + "OtherRSSMemory" -> p.otherRSSTotal) } numMetrics = numMetrics + 1 if (numMetrics == 6) { - numMetrics = 0} + numMetrics = 0 + } metrics } - private def registerProcfsMetrics[Long]( name: String) = { + + private def registerProcfsMetrics[Long](name: String) = { metricRegistry.register(MetricRegistry.name("processTree", name), new Gauge[Long] { override def getValue: Long = getProcfsMetrics(name).asInstanceOf[Long] }) } if (shouldAddProcessTreeMetricsToMetricsSet) { - registerProcfsMetrics("ProcessTreeJVMVMemory") - registerProcfsMetrics("ProcessTreeJVMRSSMemory") - registerProcfsMetrics("ProcessTreePythonVMemory") - registerProcfsMetrics("ProcessTreePythonRSSMemory") - registerProcfsMetrics("ProcessTreeOtherVMemory") - registerProcfsMetrics("ProcessTreeOtherRSSMemory") + registerProcfsMetrics("JVMVMemory") + registerProcfsMetrics("JVMRSSMemory") + registerProcfsMetrics("PythonVMemory") + registerProcfsMetrics("PythonRSSMemory") + registerProcfsMetrics("OtherVMemory") + registerProcfsMetrics("OtherRSSMemory") } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1f7cc6fdbcdef..6ed5a7f336bd1 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -99,7 +99,7 @@ package object config { .createWithDefault(false) private[spark] val METRICS_PROCESS_TREE_METRICS = - ConfigBuilder("spark.metrics.logStageExecutorProcessTreeMetrics.enabled") + ConfigBuilder("spark.metrics.AddProcessTreeMetricsToMetricsSet.enabled") .booleanConf .createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala index 0b71f0d157ab7..bf6320e744e56 100644 --- a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala +++ b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala @@ -21,10 +21,10 @@ import javax.management.ObjectName import scala.collection.mutable +import org.apache.spark.SparkEnv import org.apache.spark.executor.ProcfsMetricsGetter import org.apache.spark.internal.config import org.apache.spark.memory.MemoryManager -import org.apache.spark.SparkEnv /** diff --git a/core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala b/core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala index 9ed1497db5e1d..7b9e30c651fc7 100644 --- a/core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala @@ -25,7 +25,7 @@ class ProcfsMetricsGetterSuite extends SparkFunSuite { val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) test("testGetProcessInfo") { - var r = ProcfsMetrics(0, 0, 0, 0, 0, 0) + var r = ProcfsMetrics(0, 0, 0, 0, 0, 0, 0) r = p.addProcfsMetricsFromOneProcess(r, 26109) assert(r.jvmVmemTotal == 4769947648L) assert(r.jvmRSSTotal == 262610944) From 7ee6965b1cffb03990178ccd581260ca6d0f3894 Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Mon, 17 Dec 2018 15:35:05 -0500 Subject: [PATCH 3/6] Fix the forgotten cache statement --- .../scala/org/apache/spark/executor/ProcfsMetricsGetter.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala index b7db39811ff1f..e63f6410d952e 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala @@ -239,9 +239,11 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L // if we had an error getting any of the metrics, we don't // want to report partial metrics, as that would be misleading. if (!isAvailable) { - return ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis) + cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis) + return cachedAllMetric } } + cachedAllMetric = allMetrics allMetrics } else { From 7a21efc5d7f477230f125a13e74ee120e4d9b8da Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Tue, 18 Dec 2018 10:58:52 -0500 Subject: [PATCH 4/6] Some style comments and some other review comments applied --- .../spark/executor/ProcfsMetricsGetter.scala | 13 +------------ .../spark/executor/ProcfsMetricsSource.scala | 17 ++++++++++------- .../apache/spark/internal/config/package.scala | 2 +- .../spark/metrics/ExecutorMetricType.scala | 4 +++- 4 files changed, 15 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala index e63f6410d952e..f9cd7720b2aed 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala @@ -57,7 +57,6 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L } - private lazy val isProcfsAvailable: Boolean = { if (testing) { true @@ -68,17 +67,7 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L logWarning("Exception checking for procfs dir", ioe) false } - val shouldLogStageExecutorMetrics = - SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) - val shouldLogStageExecutorProcessTreeMetrics = - SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) - val shouldAddProcessTreeMetricsToMetricsSet = - SparkEnv.get.conf.get(config.METRICS_PROCESS_TREE_METRICS) - val pickAnyOfUIOrMetricsSet = shouldLogStageExecutorProcessTreeMetrics || - shouldAddProcessTreeMetricsToMetricsSet - - (procDirExists.get && shouldLogStageExecutorMetrics && pickAnyOfUIOrMetricsSet) || - (procDirExists.get && shouldAddProcessTreeMetricsToMetricsSet) + procDirExists.get } } diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala index 051c72f94c8b8..820a4e26cf2bd 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsSource.scala @@ -25,15 +25,18 @@ import org.apache.spark.metrics.source.Source private[executor] class ProcfsMetricsSource extends Source { override val sourceName = "procfs" - // We use numMetrics for tracking to only call computAllMetrics once per set of metrics - var numMetrics: Int = 0 + // We use the following var to only call computAllMetrics once per + // the set of procfs metrics. This is because Metrics system gauge that + // return a set of metrics can't be used without significant changes to + // ProcfsMetricGetter + var numOfRequestsToGetProcfs: Int = 0 override val metricRegistry = new MetricRegistry() var metrics: Map[String, Long] = Map.empty val shouldAddProcessTreeMetricsToMetricsSet = SparkEnv.get.conf.get(config.METRICS_PROCESS_TREE_METRICS) private def getProcfsMetrics: Map[String, Long] = { - if (numMetrics == 0) { + if (numOfRequestsToGetProcfs == 0) { metrics = Map.empty val p = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics() metrics = Map( @@ -44,10 +47,10 @@ private[executor] class ProcfsMetricsSource extends Source { "OtherVMemory" -> p.otherVmemTotal, "OtherRSSMemory" -> p.otherRSSTotal) } - numMetrics = numMetrics + 1 - if (numMetrics == 6) { - numMetrics = 0 - } + // We have 6 metrics in Procfs. So we just need to call computeAllMetrics + // every 6 times not every time that Metrics system needs the value of + // a metric + numOfRequestsToGetProcfs = (numOfRequestsToGetProcfs + 1) % 6 metrics } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 690429ea10034..8b79473730b34 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -99,7 +99,7 @@ package object config { .createWithDefault(false) private[spark] val METRICS_PROCESS_TREE_METRICS = - ConfigBuilder("spark.metrics.AddProcessTreeMetricsToMetricsSet.enabled") + ConfigBuilder("spark.metrics.addProcessTreeMetricsToMetricsSet.enabled") .booleanConf .createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala index bf6320e744e56..3ddc71a81d086 100644 --- a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala +++ b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala @@ -88,11 +88,13 @@ case object ProcessTreeMetrics extends ExecutorMetricType { "ProcessTreePythonRSSMemory", "ProcessTreeOtherVMemory", "ProcessTreeOtherRSSMemory") + val shouldLogStageExecutorMetrics = + SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) val shouldLogStageExecutorProcessTreeMetrics = SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) override private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = { val processTreeMetrics = new Array[Long](names.length) - if (shouldLogStageExecutorProcessTreeMetrics) { + if (shouldLogStageExecutorMetrics && shouldLogStageExecutorProcessTreeMetrics) { val allMetrics = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics() processTreeMetrics(0) = allMetrics.jvmVmemTotal processTreeMetrics(1) = allMetrics.jvmRSSTotal From 18fe510a2ad6cd97cb5f2c253ed3ef3c90612bcc Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Wed, 19 Dec 2018 12:09:24 -0500 Subject: [PATCH 5/6] Removing timestamp from case class per reviewers request and adding a synchronized block --- .../spark/executor/ProcfsMetricsGetter.scala | 60 +++++++++++-------- .../executor/ProcfsMetricsGetterSuite.scala | 2 +- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala index f9cd7720b2aed..14d12063960ff 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala @@ -37,8 +37,7 @@ private[spark] case class ProcfsMetrics( pythonVmemTotal: Long, pythonRSSTotal: Long, otherVmemTotal: Long, - otherRSSTotal: Long, - timeStamp: Long) + otherRSSTotal: Long) // Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop @@ -49,7 +48,8 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L private val pageSize = computePageSize() private var isAvailable: Boolean = isProcfsAvailable private val pid = computePid() - var cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0, 0) + var cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0) + var lastimeMetricsComputed = 0L private val HEARTBEAT_INTERVAL_MS = if (testing) { 0 } else { @@ -186,22 +186,19 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) { allMetrics.copy( jvmVmemTotal = allMetrics.jvmVmemTotal + vmem, - jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem), - timeStamp = System.currentTimeMillis + jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem) ) } else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) { allMetrics.copy( pythonVmemTotal = allMetrics.pythonVmemTotal + vmem, - pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem), - timeStamp = System.currentTimeMillis + pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem) ) } else { allMetrics.copy( otherVmemTotal = allMetrics.otherVmemTotal + vmem, - otherRSSTotal = allMetrics.otherRSSTotal + (rssMem), - timeStamp = System.currentTimeMillis + otherRSSTotal = allMetrics.otherRSSTotal + (rssMem) ) } } @@ -209,31 +206,44 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L case f: IOException => logWarning("There was a problem with reading" + " the stat file of the process. ", f) - ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis) + ProcfsMetrics(0, 0, 0, 0, 0, 0) } } + private[spark] def isCacheValid(): Boolean = { + val lastMetricComputation = System.currentTimeMillis() - lastimeMetricsComputed + // ToDo: Should we make this configurable? + return Math.min(1000, HEARTBEAT_INTERVAL_MS) > lastMetricComputation + } + private[spark] def computeAllMetrics(): ProcfsMetrics = { if (!isAvailable) { - return ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis) + lastimeMetricsComputed = System.currentTimeMillis + cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0) + return ProcfsMetrics(0, 0, 0, 0, 0, 0) } - val lastMetricComputation = System.currentTimeMillis() - cachedAllMetric.timeStamp - // Check whether we have computed the metrics in the past 1s - // ToDo: Should we make this configurable? - if(lastMetricComputation > Math.min(1000, HEARTBEAT_INTERVAL_MS)) { - val pids = computeProcessTree - var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis) - for (p <- pids) { - allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p) - // if we had an error getting any of the metrics, we don't - // want to report partial metrics, as that would be misleading. - if (!isAvailable) { - cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0, System.currentTimeMillis) + + if (!isCacheValid) { + this.synchronized { + if (isCacheValid) { return cachedAllMetric } + val pids = computeProcessTree + var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0) + for (p <- pids) { + allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p) + // if we had an error getting any of the metrics, we don't + // want to report partial metrics, as that would be misleading. + if (!isAvailable) { + lastimeMetricsComputed = System.currentTimeMillis + cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0) + return cachedAllMetric + } + } + lastimeMetricsComputed = System.currentTimeMillis + cachedAllMetric = allMetrics + allMetrics } - cachedAllMetric = allMetrics - allMetrics } else { cachedAllMetric diff --git a/core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala b/core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala index 7b9e30c651fc7..9ed1497db5e1d 100644 --- a/core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ProcfsMetricsGetterSuite.scala @@ -25,7 +25,7 @@ class ProcfsMetricsGetterSuite extends SparkFunSuite { val p = new ProcfsMetricsGetter(getTestResourcePath("ProcfsMetrics")) test("testGetProcessInfo") { - var r = ProcfsMetrics(0, 0, 0, 0, 0, 0, 0) + var r = ProcfsMetrics(0, 0, 0, 0, 0, 0) r = p.addProcfsMetricsFromOneProcess(r, 26109) assert(r.jvmVmemTotal == 4769947648L) assert(r.jvmRSSTotal == 262610944) From e3b23b87656c06c8b5a037c0b37e6bd1355dc3d6 Mon Sep 17 00:00:00 2001 From: Reza Safi Date: Fri, 25 Jan 2019 13:37:29 -0500 Subject: [PATCH 6/6] Review comments addressed --- .../spark/executor/ProcfsMetricsGetter.scala | 18 +++++++++--------- .../apache/spark/internal/config/package.scala | 5 +++++ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala index 14d12063960ff..1c5f335e94f62 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala @@ -48,15 +48,16 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L private val pageSize = computePageSize() private var isAvailable: Boolean = isProcfsAvailable private val pid = computePid() - var cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0) - var lastimeMetricsComputed = 0L - private val HEARTBEAT_INTERVAL_MS = if (testing) { + @volatile private var cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0) + @volatile private var lastimeMetricsComputed = 0L + private val PROCESS_TREE_METRICS_RECOMPUTE_WAIT_MS = if (testing) { 0 } else { - SparkEnv.get.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL) + SparkEnv.get.conf.get(config.PROCESS_TREE_METRICS_RECOMPUTE_WAIT) } + private lazy val isProcfsAvailable: Boolean = { if (testing) { true @@ -211,16 +212,16 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L } private[spark] def isCacheValid(): Boolean = { - val lastMetricComputation = System.currentTimeMillis() - lastimeMetricsComputed + val metricsAge = System.currentTimeMillis() - lastimeMetricsComputed // ToDo: Should we make this configurable? - return Math.min(1000, HEARTBEAT_INTERVAL_MS) > lastMetricComputation + return PROCESS_TREE_METRICS_RECOMPUTE_WAIT_MS > metricsAge } private[spark] def computeAllMetrics(): ProcfsMetrics = { if (!isAvailable) { lastimeMetricsComputed = System.currentTimeMillis cachedAllMetric = ProcfsMetrics(0, 0, 0, 0, 0, 0) - return ProcfsMetrics(0, 0, 0, 0, 0, 0) + return cachedAllMetric } if (!isCacheValid) { @@ -244,8 +245,7 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L cachedAllMetric = allMetrics allMetrics } - } - else { + } else { cachedAllMetric } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8b79473730b34..48812982e264d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -103,6 +103,11 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val PROCESS_TREE_METRICS_RECOMPUTE_WAIT = + ConfigBuilder("spark.metrics.recomputeWait") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + private[spark] val EVENT_LOG_OVERWRITE = ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)