From 193c542a488ceb9fc5445208ba5647bd97f3c5d2 Mon Sep 17 00:00:00 2001 From: lisurprise Date: Tue, 3 Mar 2015 13:14:11 +0800 Subject: [PATCH 1/6] remove streaming tab while closing streaming context --- .../scala/org/apache/spark/ui/WebUI.scala | 41 ++++++++++++++----- .../spark/streaming/StreamingContext.scala | 4 ++ .../spark/streaming/ui/StreamingPage.scala | 4 +- .../spark/streaming/ui/StreamingTab.scala | 5 ++- 4 files changed, 41 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 9be65a4a39a09..bc05156f55236 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -17,17 +17,17 @@ package org.apache.spark.ui +import scala.collection.mutable.HashMap import javax.servlet.http.HttpServletRequest -import scala.collection.mutable.ArrayBuffer -import scala.xml.Node - +import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.eclipse.jetty.servlet.ServletContextHandler import org.json4s.JsonAST.{JNothing, JValue} -import org.apache.spark.{Logging, SecurityManager, SparkConf} -import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.Utils +import scala.collection.mutable.ArrayBuffer +import scala.xml.Node /** * The top level component of the UI hierarchy that contains the server. @@ -45,6 +45,7 @@ private[spark] abstract class WebUI( protected val tabs = ArrayBuffer[WebUITab]() protected val handlers = ArrayBuffer[ServletContextHandler]() + protected val page2Handlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]] protected var serverInfo: Option[ServerInfo] = None protected val localHostName = Utils.localHostName() protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) @@ -60,14 +61,34 @@ private[spark] abstract class WebUI( tab.pages.foreach(attachPage) tabs += tab } + + def detachTab(tab: WebUITab) { + tab.pages.foreach(detachPage) + tabs -= tab + } + + def detachPage(page: WebUIPage) { + page2Handlers.remove(page) match { + case Some(handlers) => handlers.foreach(detachHandler) + case None => + } + } + /** Attach a page to this UI. */ def attachPage(page: WebUIPage) { val pagePath = "/" + page.prefix - attachHandler(createServletHandler(pagePath, - (request: HttpServletRequest) => page.render(request), securityManager, basePath)) - attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json", - (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)) + val renderHandler = createServletHandler(pagePath, + (request: HttpServletRequest) => page.render(request), securityManager, basePath) + val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", + (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath) + attachHandler(renderHandler) + attachHandler(renderJsonHandler) + page2Handlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) + .append(renderHandler) + page2Handlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) + .append(renderJsonHandler) + } /** Attach a handler to this UI. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ba3f23434f24c..1b1f231c69c9b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -553,6 +553,10 @@ class StreamingContext private[streaming] ( */ def stop(stopSparkContext: Boolean = true): Unit = synchronized { stop(stopSparkContext, false) + this.uiTab match { + case Some(tab) => StreamingTab.detachStreamingTab(this, tab) + case None => + } } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 98e9a2e639e25..bfe8086fcf8fe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -32,7 +32,7 @@ private[ui] class StreamingPage(parent: StreamingTab) extends WebUIPage("") with Logging { private val listener = parent.listener - private val startTime = Calendar.getInstance().getTime() + private val startTime = System.currentTimeMillis() private val emptyCell = "-" /** Render the page */ @@ -47,7 +47,7 @@ private[ui] class StreamingPage(parent: StreamingTab) /** Generate basic stats of the streaming program */ private def generateBasicStats(): Seq[Node] = { - val timeSinceStart = System.currentTimeMillis() - startTime.getTime + val timeSinceStart = System.currentTimeMillis() - startTime