From a4f6ee2ba29df087eae984dc6513b10a641f4fae Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Thu, 22 Jun 2017 16:53:30 +0100 Subject: [PATCH 1/3] Dynamically register metrics from sources as they are reported --- .../apache/spark/metrics/MetricsSystem.scala | 82 +++++++++++++++---- .../spark/metrics/MetricsSystemSuite.scala | 37 +++++++-- .../streaming/StreamingContextSuite.scala | 9 +- 3 files changed, 101 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 709ce0060e150..e1719929c054e 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable -import com.codahale.metrics.{Metric, MetricRegistry} +import com.codahale.metrics.{Counter, Gauge, Histogram, Meter, Metric, MetricRegistry, MetricRegistryListener, Timer} import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf} @@ -69,13 +69,15 @@ import org.apache.spark.util.Utils * [options] represent the specific property of this source or sink. */ private[spark] class MetricsSystem private ( - val instance: String, conf: SparkConf) extends Logging { + val instance: String, + conf: SparkConf, + registry: MetricRegistry) + extends Logging { private[this] val metricsConfig = new MetricsConfig(conf) private val sinks = new mutable.ArrayBuffer[Sink] - private val sources = new mutable.ArrayBuffer[Source] - private val registry = new MetricRegistry() + private val sourcesWithListeners = new mutable.HashMap[Source, MetricRegistryListener] private var running: Boolean = false @@ -109,6 +111,9 @@ private[spark] class MetricsSystem private ( if (running) { sinks.foreach(_.stop()) registry.removeMatching((_: String, _: Metric) => true) + sourcesWithListeners.synchronized { + sourcesWithListeners.keySet.foreach(removeSource) + } } else { logWarning("Stopping a MetricsSystem that is not running") } @@ -154,25 +159,21 @@ private[spark] class MetricsSystem private ( } else { defaultName } } - def getSourcesByName(sourceName: String): Seq[Source] = sources.synchronized { - sources.filter(_.sourceName == sourceName).toSeq + def getSourcesByName(sourceName: String): Seq[Source] = sourcesWithListeners.synchronized { + sourcesWithListeners.keySet.filter(_.sourceName == sourceName).toSeq } def registerSource(source: Source): Unit = { - sources.synchronized { - sources += source - } - try { - val regName = buildRegistryName(source) - registry.register(regName, source.metricRegistry) - } catch { - case e: IllegalArgumentException => logInfo("Metrics already registered", e) + val listener = new MetricsSystemListener(buildRegistryName(source)) + sourcesWithListeners.synchronized { + sourcesWithListeners += source -> listener } + source.metricRegistry.addListener(listener) } def removeSource(source: Source): Unit = { - sources.synchronized { - sources -= source + sourcesWithListeners.synchronized { + sourcesWithListeners.remove(source).foreach(source.metricRegistry.removeListener) } val regName = buildRegistryName(source) registry.removeMatching((name: String, _: Metric) => name.startsWith(regName)) @@ -240,6 +241,48 @@ private[spark] class MetricsSystem private ( } def metricsProperties(): Properties = metricsConfig.properties + + private[spark] class MetricsSystemListener(prefix: String) extends MetricRegistryListener { + def metricName(name: String): String = MetricRegistry.name(prefix, name) + + def registerMetric[T <: Metric](name: String, metric: T): Unit = { + try { + registry.register(metricName(name), metric) + } catch { + case e: IllegalArgumentException => logInfo("Metrics already registered", e) + } + } + + override def onHistogramAdded(name: String, histogram: Histogram): Unit = + registerMetric(name, histogram) + + override def onCounterAdded(name: String, counter: Counter): Unit = + registerMetric(name, counter) + + override def onMeterAdded(name: String, meter: Meter): Unit = + registerMetric(name, meter) + + override def onGaugeAdded(name: String, gauge: Gauge[_]): Unit = + registerMetric(name, gauge) + + override def onTimerAdded(name: String, timer: Timer): Unit = + registerMetric(name, timer) + + override def onHistogramRemoved(name: String): Unit = + registry.remove(metricName(name)) + + override def onGaugeRemoved(name: String): Unit = + registry.remove(metricName(name)) + + override def onMeterRemoved(name: String): Unit = + registry.remove(metricName(name)) + + override def onCounterRemoved(name: String): Unit = + registry.remove(metricName(name)) + + override def onTimerRemoved(name: String): Unit = + registry.remove(metricName(name)) + } } private[spark] object MetricsSystem { @@ -257,8 +300,11 @@ private[spark] object MetricsSystem { } } - def createMetricsSystem(instance: String, conf: SparkConf): MetricsSystem = { - new MetricsSystem(instance, conf) + def createMetricsSystem( + instance: String, + conf: SparkConf, + registry: MetricRegistry = new MetricRegistry): MetricsSystem = { + new MetricsSystem(instance, conf, registry) } } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 80dc4ff758666..a9a4a10738ee7 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -20,8 +20,9 @@ package org.apache.spark.metrics import java.util.Properties import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap -import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.{Gauge, MetricRegistry, MetricRegistryListener} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} @@ -44,10 +45,11 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM test("MetricsSystem with default config") { val metricsSystem = MetricsSystem.createMetricsSystem("default", conf) metricsSystem.start() - val sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources")) + val sources = + PrivateMethod[HashMap[Source, MetricRegistryListener]](Symbol("sourcesWithListeners")) val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks")) - assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length) + assert(metricsSystem.invokePrivate(sources()).size === StaticSources.allSources.length) assert(metricsSystem.invokePrivate(sinks()).length === 0) assert(metricsSystem.getServletHandlers.nonEmpty) } @@ -55,16 +57,17 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM test("MetricsSystem with sources add") { val metricsSystem = MetricsSystem.createMetricsSystem("test", conf) metricsSystem.start() - val sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources")) + val sources = + PrivateMethod[HashMap[Source, MetricRegistryListener]](Symbol("sourcesWithListeners")) val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks")) - assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length) + assert(metricsSystem.invokePrivate(sources()).size === StaticSources.allSources.length) assert(metricsSystem.invokePrivate(sinks()).length === 1) assert(metricsSystem.getServletHandlers.nonEmpty) val source = new MasterSource(null) metricsSystem.registerSource(source) - assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length + 1) + assert(metricsSystem.invokePrivate(sources()).size === StaticSources.allSources.length + 1) } test("MetricsSystem with Driver instance") { @@ -281,6 +284,28 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricsSystem.invokePrivate(sinks()).length === 1) } + + test("MetricsSystem registers dynamically added metrics") { + val registry = new MetricRegistry() + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val instanceName = "testInstance" + val metricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, registry) + metricsSystem.registerSource(source) + assert(!registry.getNames.contains("dummySource.newMetric"), "Metric shouldn't be registered") + + source.metricRegistry.register( + "newMetric", + new Gauge[Integer] { + override def getValue: Integer = 1 + }) + assert( + registry.getNames.contains("dummySource.newMetric"), + "Metric should have been registered") + } } class ThreeParameterConstructorSink( diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a45d3b4bc8a94..f856ce3fd085f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -23,9 +23,10 @@ import java.util.Locale import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap import scala.collection.mutable.Queue +import com.codahale.metrics.MetricRegistryListener import org.apache.commons.io.FileUtils import org.scalatest.{Assertions, PrivateMethodTester} import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} @@ -1027,8 +1028,10 @@ object testPackage extends Assertions { * This includes methods to access private methods and fields in StreamingContext and MetricsSystem */ private object StreamingContextSuite extends PrivateMethodTester { - private val _sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources")) - private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { + private val _sources = + PrivateMethod[HashMap[Source, MetricRegistryListener]](Symbol("sourcesWithListeners")) + private def getSources( + metricsSystem: MetricsSystem): HashMap[Source, MetricRegistryListener] = { metricsSystem.invokePrivate(_sources()) } private val _streamingSource = PrivateMethod[StreamingSource](Symbol("streamingSource")) From 4d7f91fb3317116fc99f6c06ca8935c111a1493e Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 18 Sep 2024 00:18:03 +0100 Subject: [PATCH 2/3] code review --- .../apache/spark/metrics/MetricsSystem.scala | 21 +++++++++++-------- .../spark/metrics/MetricsSystemSuite.scala | 11 +++++----- .../streaming/StreamingContextSuite.scala | 14 +++++-------- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index e1719929c054e..1c773a84d2b57 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -77,7 +77,7 @@ private[spark] class MetricsSystem private ( private[this] val metricsConfig = new MetricsConfig(conf) private val sinks = new mutable.ArrayBuffer[Sink] - private val sourcesWithListeners = new mutable.HashMap[Source, MetricRegistryListener] + private val sources = new mutable.ArrayBuffer[(Source, MetricRegistryListener)] private var running: Boolean = false @@ -111,8 +111,8 @@ private[spark] class MetricsSystem private ( if (running) { sinks.foreach(_.stop()) registry.removeMatching((_: String, _: Metric) => true) - sourcesWithListeners.synchronized { - sourcesWithListeners.keySet.foreach(removeSource) + sources.synchronized { + sources.foreach(s => removeSource(s._1)) } } else { logWarning("Stopping a MetricsSystem that is not running") @@ -159,21 +159,24 @@ private[spark] class MetricsSystem private ( } else { defaultName } } - def getSourcesByName(sourceName: String): Seq[Source] = sourcesWithListeners.synchronized { - sourcesWithListeners.keySet.filter(_.sourceName == sourceName).toSeq + def getSourcesByName(sourceName: String): Seq[Source] = sources.synchronized { + sources.filter(s => s._1.sourceName == sourceName).map(_._1).toSeq } def registerSource(source: Source): Unit = { val listener = new MetricsSystemListener(buildRegistryName(source)) - sourcesWithListeners.synchronized { - sourcesWithListeners += source -> listener + sources.synchronized { + sources += (source, listener) } source.metricRegistry.addListener(listener) } def removeSource(source: Source): Unit = { - sourcesWithListeners.synchronized { - sourcesWithListeners.remove(source).foreach(source.metricRegistry.removeListener) + sources.synchronized { + val sourceIdx = sources.indexWhere(s => s._1 == source) + if (sourceIdx != -1) { + source.metricRegistry.removeListener(sources.remove(sourceIdx)._2) + } } val regName = buildRegistryName(source) registry.removeMatching((name: String, _: Metric) => name.startsWith(regName)) diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index a9a4a10738ee7..00d9a6f1fbdbb 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.metrics import java.util.Properties import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap import com.codahale.metrics.{Gauge, MetricRegistry, MetricRegistryListener} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} @@ -46,10 +45,10 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM val metricsSystem = MetricsSystem.createMetricsSystem("default", conf) metricsSystem.start() val sources = - PrivateMethod[HashMap[Source, MetricRegistryListener]](Symbol("sourcesWithListeners")) + PrivateMethod[ArrayBuffer[(Source, MetricRegistryListener)]](Symbol("sources")) val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks")) - assert(metricsSystem.invokePrivate(sources()).size === StaticSources.allSources.length) + assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length) assert(metricsSystem.invokePrivate(sinks()).length === 0) assert(metricsSystem.getServletHandlers.nonEmpty) } @@ -58,16 +57,16 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM val metricsSystem = MetricsSystem.createMetricsSystem("test", conf) metricsSystem.start() val sources = - PrivateMethod[HashMap[Source, MetricRegistryListener]](Symbol("sourcesWithListeners")) + PrivateMethod[ArrayBuffer[(Source, MetricRegistryListener)]](Symbol("sources")) val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks")) - assert(metricsSystem.invokePrivate(sources()).size === StaticSources.allSources.length) + assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length) assert(metricsSystem.invokePrivate(sinks()).length === 1) assert(metricsSystem.getServletHandlers.nonEmpty) val source = new MasterSource(null) metricsSystem.registerSource(source) - assert(metricsSystem.invokePrivate(sources()).size === StaticSources.allSources.length + 1) + assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length + 1) } test("MetricsSystem with Driver instance") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index f856ce3fd085f..3d6b5fbf1c5be 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -22,10 +22,7 @@ import java.nio.charset.StandardCharsets import java.util.Locale import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue - +import scala.collection.mutable.{ArrayBuffer, Queue} import com.codahale.metrics.MetricRegistryListener import org.apache.commons.io.FileUtils import org.scalatest.{Assertions, PrivateMethodTester} @@ -33,7 +30,6 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ - import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI.UI_ENABLED @@ -378,14 +374,14 @@ class StreamingContextSuite val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) val streamingSource = StreamingContextSuite.getStreamingSource(ssc) - assert(sources.contains(streamingSource)) + assert(sources.exists(s => s._1 == streamingSource)) assert(ssc.getState() === StreamingContextState.ACTIVE) ssc.stop() val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem) val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc) assert(ssc.getState() === StreamingContextState.STOPPED) - assert(!sourcesAfterStop.contains(streamingSourceAfterStop)) + assert(!sourcesAfterStop.exists(s => s._1 == streamingSourceAfterStop)) } test("SPARK-28709 registering and de-registering of progressListener") { @@ -1029,9 +1025,9 @@ object testPackage extends Assertions { */ private object StreamingContextSuite extends PrivateMethodTester { private val _sources = - PrivateMethod[HashMap[Source, MetricRegistryListener]](Symbol("sourcesWithListeners")) + PrivateMethod[ArrayBuffer[(Source, MetricRegistryListener)]](Symbol("sources")) private def getSources( - metricsSystem: MetricsSystem): HashMap[Source, MetricRegistryListener] = { + metricsSystem: MetricsSystem): ArrayBuffer[(Source, MetricRegistryListener)] = { metricsSystem.invokePrivate(_sources()) } private val _streamingSource = PrivateMethod[StreamingSource](Symbol("streamingSource")) From 67b52858a5ee780bc2791a26cc7eed0204c75dca Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Wed, 18 Sep 2024 01:21:16 +0100 Subject: [PATCH 3/3] fewer changes --- .../apache/spark/metrics/MetricsSystem.scala | 66 ++++--------------- .../spark/metrics/MetricsSystemSuite.scala | 8 +-- .../streaming/StreamingContextSuite.scala | 17 ++--- 3 files changed, 23 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 1c773a84d2b57..523194d8a4873 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable -import com.codahale.metrics.{Counter, Gauge, Histogram, Meter, Metric, MetricRegistry, MetricRegistryListener, Timer} +import com.codahale.metrics.{Metric, MetricRegistry} import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf} @@ -77,7 +77,7 @@ private[spark] class MetricsSystem private ( private[this] val metricsConfig = new MetricsConfig(conf) private val sinks = new mutable.ArrayBuffer[Sink] - private val sources = new mutable.ArrayBuffer[(Source, MetricRegistryListener)] + private val sources = new mutable.ArrayBuffer[Source] private var running: Boolean = false @@ -111,9 +111,6 @@ private[spark] class MetricsSystem private ( if (running) { sinks.foreach(_.stop()) registry.removeMatching((_: String, _: Metric) => true) - sources.synchronized { - sources.foreach(s => removeSource(s._1)) - } } else { logWarning("Stopping a MetricsSystem that is not running") } @@ -160,23 +157,24 @@ private[spark] class MetricsSystem private ( } def getSourcesByName(sourceName: String): Seq[Source] = sources.synchronized { - sources.filter(s => s._1.sourceName == sourceName).map(_._1).toSeq + sources.filter(_.sourceName == sourceName).toSeq } def registerSource(source: Source): Unit = { - val listener = new MetricsSystemListener(buildRegistryName(source)) sources.synchronized { - sources += (source, listener) + sources += source + } + try { + val regName = buildRegistryName(source) + registry.register(regName, source.metricRegistry) + } catch { + case e: IllegalArgumentException => logInfo("Metrics already registered", e) } - source.metricRegistry.addListener(listener) } def removeSource(source: Source): Unit = { sources.synchronized { - val sourceIdx = sources.indexWhere(s => s._1 == source) - if (sourceIdx != -1) { - source.metricRegistry.removeListener(sources.remove(sourceIdx)._2) - } + sources -= source } val regName = buildRegistryName(source) registry.removeMatching((name: String, _: Metric) => name.startsWith(regName)) @@ -244,48 +242,6 @@ private[spark] class MetricsSystem private ( } def metricsProperties(): Properties = metricsConfig.properties - - private[spark] class MetricsSystemListener(prefix: String) extends MetricRegistryListener { - def metricName(name: String): String = MetricRegistry.name(prefix, name) - - def registerMetric[T <: Metric](name: String, metric: T): Unit = { - try { - registry.register(metricName(name), metric) - } catch { - case e: IllegalArgumentException => logInfo("Metrics already registered", e) - } - } - - override def onHistogramAdded(name: String, histogram: Histogram): Unit = - registerMetric(name, histogram) - - override def onCounterAdded(name: String, counter: Counter): Unit = - registerMetric(name, counter) - - override def onMeterAdded(name: String, meter: Meter): Unit = - registerMetric(name, meter) - - override def onGaugeAdded(name: String, gauge: Gauge[_]): Unit = - registerMetric(name, gauge) - - override def onTimerAdded(name: String, timer: Timer): Unit = - registerMetric(name, timer) - - override def onHistogramRemoved(name: String): Unit = - registry.remove(metricName(name)) - - override def onGaugeRemoved(name: String): Unit = - registry.remove(metricName(name)) - - override def onMeterRemoved(name: String): Unit = - registry.remove(metricName(name)) - - override def onCounterRemoved(name: String): Unit = - registry.remove(metricName(name)) - - override def onTimerRemoved(name: String): Unit = - registry.remove(metricName(name)) - } } private[spark] object MetricsSystem { diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 00d9a6f1fbdbb..25c2280b7ae8b 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -21,7 +21,7 @@ import java.util.Properties import scala.collection.mutable.ArrayBuffer -import com.codahale.metrics.{Gauge, MetricRegistry, MetricRegistryListener} +import com.codahale.metrics.{Gauge, MetricRegistry} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} @@ -44,8 +44,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM test("MetricsSystem with default config") { val metricsSystem = MetricsSystem.createMetricsSystem("default", conf) metricsSystem.start() - val sources = - PrivateMethod[ArrayBuffer[(Source, MetricRegistryListener)]](Symbol("sources")) + val sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources")) val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks")) assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length) @@ -56,8 +55,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM test("MetricsSystem with sources add") { val metricsSystem = MetricsSystem.createMetricsSystem("test", conf) metricsSystem.start() - val sources = - PrivateMethod[ArrayBuffer[(Source, MetricRegistryListener)]](Symbol("sources")) + val sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources")) val sinks = PrivateMethod[ArrayBuffer[Sink]](Symbol("sinks")) assert(metricsSystem.invokePrivate(sources()).length === StaticSources.allSources.length) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 3d6b5fbf1c5be..a45d3b4bc8a94 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -22,14 +22,17 @@ import java.nio.charset.StandardCharsets import java.util.Locale import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.{ArrayBuffer, Queue} -import com.codahale.metrics.MetricRegistryListener + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + import org.apache.commons.io.FileUtils import org.scalatest.{Assertions, PrivateMethodTester} import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.Eventually._ import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ + import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI.UI_ENABLED @@ -374,14 +377,14 @@ class StreamingContextSuite val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) val streamingSource = StreamingContextSuite.getStreamingSource(ssc) - assert(sources.exists(s => s._1 == streamingSource)) + assert(sources.contains(streamingSource)) assert(ssc.getState() === StreamingContextState.ACTIVE) ssc.stop() val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem) val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc) assert(ssc.getState() === StreamingContextState.STOPPED) - assert(!sourcesAfterStop.exists(s => s._1 == streamingSourceAfterStop)) + assert(!sourcesAfterStop.contains(streamingSourceAfterStop)) } test("SPARK-28709 registering and de-registering of progressListener") { @@ -1024,10 +1027,8 @@ object testPackage extends Assertions { * This includes methods to access private methods and fields in StreamingContext and MetricsSystem */ private object StreamingContextSuite extends PrivateMethodTester { - private val _sources = - PrivateMethod[ArrayBuffer[(Source, MetricRegistryListener)]](Symbol("sources")) - private def getSources( - metricsSystem: MetricsSystem): ArrayBuffer[(Source, MetricRegistryListener)] = { + private val _sources = PrivateMethod[ArrayBuffer[Source]](Symbol("sources")) + private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { metricsSystem.invokePrivate(_sources()) } private val _streamingSource = PrivateMethod[StreamingSource](Symbol("streamingSource"))