From fe88e758a5e309ab90e45938dd2456be4d6164e4 Mon Sep 17 00:00:00 2001 From: Gurvinder Singh Date: Thu, 26 May 2016 23:14:05 +0200 Subject: [PATCH 1/8] added support for reverse proxying request to workers and application UI. This helps in accessing SparkUI when running spark cluster in closed networks. --- core/pom.xml | 10 +++ .../scala/org/apache/spark/SparkContext.scala | 3 + .../apache/spark/deploy/master/Master.scala | 14 ++++- .../deploy/master/ui/ApplicationPage.scala | 18 ++++-- .../spark/deploy/master/ui/MasterPage.scala | 21 ++++++- .../spark/deploy/master/ui/MasterWebUI.scala | 16 +++++ .../spark/deploy/worker/ExecutorRunner.scala | 6 +- .../org/apache/spark/ui/JettyUtils.scala | 63 +++++++++++++++++++ .../spark/deploy/master/MasterSuite.scala | 28 +++++++++ docs/configuration.md | 14 +++++ pom.xml | 14 +++++ 11 files changed, 198 insertions(+), 9 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index ab6c3ce805275..be74457f169f4 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -124,6 +124,16 @@ jetty-servlet compile + + org.eclipse.jetty + jetty-proxy + compile + + + org.eclipse.jetty + jetty-client + compile + org.eclipse.jetty jetty-servlets diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2eaeab1d807b4..d6b6bfd01fea8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -502,6 +502,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _applicationId = _taskScheduler.applicationId() _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) + if (_conf.getBoolean("spark.ui.reverseProxy", false)) { + System.setProperty("spark.ui.proxyBase", "/target/" + _applicationId) + } _ui.foreach(_.setAppId(_applicationId)) _env.blockManager.initialize(_applicationId) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index dfffc47703ab4..203a33d9c78a3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -114,6 +114,7 @@ private[deploy] class Master( // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue) private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue) + val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false) if (defaultCores < 1) { throw new SparkException("spark.deploy.defaultCores must be positive") } @@ -128,7 +129,14 @@ private[deploy] class Master( logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") webUi = new MasterWebUI(this, webUiPort) webUi.bind() - masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort + if (reverseProxy) { + masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", null) + if (masterWebUiUrl == null) { + throw new SparkException("spark.ui.reverseProxyUrl must be provided") + } + } else { + masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort + } checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) @@ -755,6 +763,7 @@ private[deploy] class Master( workers += worker idToWorker(worker.id) = worker addressToWorker(workerAddress) = worker + if (reverseProxy) webUi.addProxyTargets(worker.id, worker.webUiAddress) true } @@ -763,6 +772,7 @@ private[deploy] class Master( worker.setState(WorkerState.DEAD) idToWorker -= worker.id addressToWorker -= worker.endpoint.address + if (reverseProxy) webUi.removeProxyTargets(worker.id) for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( @@ -810,6 +820,7 @@ private[deploy] class Master( endpointToApp(app.driver) = app addressToApp(appAddress) = app waitingApps += app + if (reverseProxy) webUi.addProxyTargets(app.id, app.desc.appUiUrl) } private def finishApplication(app: ApplicationInfo) { @@ -823,6 +834,7 @@ private[deploy] class Master( idToApp -= app.id endpointToApp -= app.driver addressToApp -= app.driver.address + if (reverseProxy) webUi.removeProxyTargets(app.id) if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach { a => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 8875fc223250d..0a36c9094a528 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -77,7 +77,15 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
  • State: {app.state}
  • { if (!app.isFinished) { -
  • Application Detail UI
  • +
  • + { + if (parent.master.reverseProxy) { + Application Detail UI + } else { + Application Detail UI + } + } +
  • } } @@ -100,19 +108,21 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") } private def executorRow(executor: ExecutorDesc): Seq[Node] = { + var workerUrlRef = executor.worker.webUiAddress + if (parent.master.reverseProxy) workerUrlRef = "/target/" + executor.worker.id + "/" {executor.id} - {executor.worker.id} + {executor.worker.id} {executor.cores} {executor.memory} {executor.state} stdout + .format(workerUrlRef, executor.application.id, executor.id)}>stdout stderr + .format(workerUrlRef, executor.application.id, executor.id)}>stderr } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 5ed3e39edc484..6d653c4f1fe2c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -176,7 +176,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { private def workerRow(worker: WorkerInfo): Seq[Node] = { - {worker.id} + { + if (parent.master.reverseProxy) { + {worker.id} + } else { + {worker.id} + } + } {worker.host}:{worker.port} {worker.state} @@ -210,7 +216,11 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { if (app.isFinished) { app.desc.name } else { - {app.desc.name} + if (parent.master.reverseProxy) { + {app.desc.name} + } else { + {app.desc.name} + } } } @@ -244,7 +254,12 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { {driver.id} {killLink} {driver.submitDate} - {driver.worker.map(w => {w.id.toString}).getOrElse("None")} + {driver.worker.map(w => + if (parent.master.reverseProxy) { + {w.id.toString} + } else { + {w.id.toString} + }).getOrElse("None")} {driver.state} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index a0727ad83fb66..898c14e031fbb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,6 +17,10 @@ package org.apache.spark.deploy.master.ui +import scala.collection.mutable.HashMap + +import org.eclipse.jetty.servlet.ServletContextHandler + import org.apache.spark.deploy.master.Master import org.apache.spark.internal.Logging import org.apache.spark.ui.{SparkUI, WebUI} @@ -34,6 +38,7 @@ class MasterWebUI( val masterEndpointRef = master.self val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) + val proxyHandlers = new HashMap[String, ServletContextHandler] initialize() @@ -48,6 +53,17 @@ class MasterWebUI( attachHandler(createRedirectHandler( "/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST"))) } + + def addProxyTargets(id: String, target: String): Unit = { + val handler = createProxyHandler("/target/" + id, target) + attachHandler(handler) + proxyHandlers(id) = handler + } + + def removeProxyTargets(id: String): Unit = { + detachHandler(proxyHandlers.get(id).get) + proxyHandlers -= id + } } private[master] object MasterWebUI { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 06066248ea5d0..c462d339645f6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -156,7 +156,11 @@ private[deploy] class ExecutorRunner( // Add webUI log urls val baseUrl = - s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" + if (conf.getBoolean("spark.ui.reverseProxy", false)) { + s"/target/$workerId/logPage/?appId=$appId&executorId=$execId&logType=" + } else { + s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" + } builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 50283f2b74a41..a4f2cf0632f06 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import scala.xml.Node +import org.eclipse.jetty.client.api.Response +import org.eclipse.jetty.proxy.ProxyServlet import org.eclipse.jetty.server.{Request, Server, ServerConnector} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet._ @@ -186,6 +188,67 @@ private[spark] object JettyUtils extends Logging { contextHandler } + /** Create a handler for proxying request to Workers and Application Drivers */ + def createProxyHandler( + prefix: String, + target: String): ServletContextHandler = { + val servlet = new ProxyServlet { + override def rewriteTarget(request: HttpServletRequest): String = { + val path = request.getRequestURI(); + if (!path.startsWith(prefix)) return null + + val uri = new StringBuilder(target) + if (target.endsWith("/")) uri.setLength(uri.length() - 1) + val rest = path.substring(prefix.length()) + if (!rest.isEmpty()) + { + if (!rest.startsWith("/")) { + uri.append("/") + } + uri.append(rest) + } + + val query = request.getQueryString() + if (query != null) { + // Is there at least one path segment ? + val separator = "://" + if (uri.indexOf("/", uri.indexOf(separator) + separator.length()) < 0) { + uri.append("/") + } + uri.append("?").append(query) + } + val rewrittenURI = URI.create(uri.toString()).normalize() + + if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) { + return null + } + + rewrittenURI.toString(); + } + override def filterServerResponseHeader( + clientRequest: HttpServletRequest, + serverResponse: Response, + headerName: String, + headerValue: String): String = { + if (headerName.equalsIgnoreCase("location")) { + val targetUri = serverResponse.getRequest().getURI(); + val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority(); + if (headerValue.startsWith(toReplace)) { + return clientRequest.getScheme() + "://" + clientRequest.getHeader("host") + + prefix + headerValue.substring(toReplace.length()) + } + } + super.filterServerResponseHeader(clientRequest, serverResponse, headerName, headerValue); + } + } + + val contextHandler = new ServletContextHandler + val holder = new ServletHolder(servlet) + contextHandler.setContextPath(prefix) + contextHandler.addServlet(holder, "/") + contextHandler + } + /** Add filters, if any, to the given list of ServletContextHandlers */ def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) { val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim()) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 7cbe4e342eaa5..341fc3d5c5882 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -157,6 +157,34 @@ class MasterSuite extends SparkFunSuite } } + test("master/worker web ui available with reverseProxy") { + implicit val formats = org.json4s.DefaultFormats + val reverseProxyUrl = "http://localhost:8080" + val conf = new SparkConf() + conf.set("spark.ui.reverseProxy", "true") + conf.set("spark.ui.reverseProxyUrl", reverseProxyUrl) + val localCluster = new LocalSparkCluster(2, 2, 512, conf) + localCluster.start() + try { + eventually(timeout(5 seconds), interval(100 milliseconds)) { + val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json") + .getLines().mkString("\n") + val JArray(workers) = (parse(json) \ "workers") + workers.size should be (2) + workers.foreach { workerSummaryJson => + val JString(workerId) = workerSummaryJson \ "id" + val url = s"http://localhost:${localCluster.masterWebUIPort}/target/${workerId}/json" + val workerResponse = parse(Source.fromURL(url) + .getLines().mkString("\n")) + (workerResponse \ "cores").extract[Int] should be (2) + (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl) + } + } + } finally { + localCluster.stop() + } + } + test("basic scheduling - spread out") { basicScheduling(spreadOut = true) } diff --git a/docs/configuration.md b/docs/configuration.md index 2f801961050e1..d15cf569953af 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -634,6 +634,20 @@ Apart from these, the following properties are also available, and may be useful collecting. + + spark.ui.reverseProxy + false + + To enable running Spark Master, worker and application UI behined a reverse proxy. In this mode, Spark master will reverse proxy the worker and application UIs to enable access. + + + + spark.ui.reverseProxyUrl + http://localhost:8080 + + This is the URL where your proxy is running. Make sure this is a complete URL includeing scheme (http/https) and port to reach your proxy. + + spark.worker.ui.retainedExecutors 1000 diff --git a/pom.xml b/pom.xml index 989658216e5fd..6272ace54f79e 100644 --- a/pom.xml +++ b/pom.xml @@ -339,6 +339,18 @@ ${jetty.version} provided
    + + org.eclipse.jetty + jetty-proxy + ${jetty.version} + provided + + + org.eclipse.jetty + jetty-client + ${jetty.version} + provided + org.eclipse.jetty jetty-util @@ -2258,6 +2270,8 @@ org.spark-project.spark:unused org.eclipse.jetty:jetty-io org.eclipse.jetty:jetty-http + org.eclipse.jetty:jetty-proxy + org.eclipse.jetty:jetty-client org.eclipse.jetty:jetty-continuation org.eclipse.jetty:jetty-servlet org.eclipse.jetty:jetty-servlets From c695f3d420f18d0ab443a763490f1b634354f81c Mon Sep 17 00:00:00 2001 From: Gurvinder Singh Date: Wed, 29 Jun 2016 07:10:41 +0200 Subject: [PATCH 2/8] default to master public url, but if proxy url is given use that --- .../scala/org/apache/spark/deploy/master/Master.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 203a33d9c78a3..35aca5e585aff 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -129,13 +129,9 @@ private[deploy] class Master( logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") webUi = new MasterWebUI(this, webUiPort) webUi.bind() + masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort if (reverseProxy) { - masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", null) - if (masterWebUiUrl == null) { - throw new SparkException("spark.ui.reverseProxyUrl must be provided") - } - } else { - masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort + masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl) } checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { From f435e4d399ff73063f557885938e765548162f17 Mon Sep 17 00:00:00 2001 From: Gurvinder Singh Date: Wed, 6 Jul 2016 07:54:07 +0200 Subject: [PATCH 3/8] fixed syntax and a typo --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 3 +-- docs/configuration.md | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a4f2cf0632f06..4b3c8cb92857a 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -200,8 +200,7 @@ private[spark] object JettyUtils extends Logging { val uri = new StringBuilder(target) if (target.endsWith("/")) uri.setLength(uri.length() - 1) val rest = path.substring(prefix.length()) - if (!rest.isEmpty()) - { + if (!rest.isEmpty()) { if (!rest.startsWith("/")) { uri.append("/") } diff --git a/docs/configuration.md b/docs/configuration.md index d15cf569953af..655f980b5cbbb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -645,7 +645,7 @@ Apart from these, the following properties are also available, and may be useful spark.ui.reverseProxyUrl http://localhost:8080 - This is the URL where your proxy is running. Make sure this is a complete URL includeing scheme (http/https) and port to reach your proxy. + This is the URL where your proxy is running. Make sure this is a complete URL including scheme (http/https) and port to reach your proxy. From f7cec6cdd5e3bdbdad70ea48e9c0da6885d4909f Mon Sep 17 00:00:00 2001 From: Gurvinder Singh Date: Wed, 20 Jul 2016 11:14:56 +0200 Subject: [PATCH 4/8] changed proxy url path from target to proxy to make it explicit that we are proxying --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/deploy/master/ui/ApplicationPage.scala | 4 ++-- .../org/apache/spark/deploy/master/ui/MasterPage.scala | 6 +++--- .../org/apache/spark/deploy/master/ui/MasterWebUI.scala | 2 +- .../org/apache/spark/deploy/worker/ExecutorRunner.scala | 2 +- .../scala/org/apache/spark/deploy/master/MasterSuite.scala | 2 +- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d6b6bfd01fea8..4c4a52867e9d1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -503,7 +503,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) if (_conf.getBoolean("spark.ui.reverseProxy", false)) { - System.setProperty("spark.ui.proxyBase", "/target/" + _applicationId) + System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId) } _ui.foreach(_.setAppId(_applicationId)) _env.blockManager.initialize(_applicationId) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 0a36c9094a528..2c9b790fd25bf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -80,7 +80,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
  • { if (parent.master.reverseProxy) { - Application Detail UI + Application Detail UI } else { Application Detail UI } @@ -109,7 +109,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") private def executorRow(executor: ExecutorDesc): Seq[Node] = { var workerUrlRef = executor.worker.webUiAddress - if (parent.master.reverseProxy) workerUrlRef = "/target/" + executor.worker.id + "/" + if (parent.master.reverseProxy) workerUrlRef = "/proxy/" + executor.worker.id + "/" {executor.id} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 6d653c4f1fe2c..d17c4d04bf274 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -178,7 +178,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { { if (parent.master.reverseProxy) { - {worker.id} + {worker.id} } else { {worker.id} } @@ -217,7 +217,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { app.desc.name } else { if (parent.master.reverseProxy) { - {app.desc.name} + {app.desc.name} } else { {app.desc.name} } @@ -256,7 +256,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { {driver.submitDate} {driver.worker.map(w => if (parent.master.reverseProxy) { - {w.id.toString} + {w.id.toString} } else { {w.id.toString} }).getOrElse("None")} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 898c14e031fbb..5f7d810b8dc7e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -55,7 +55,7 @@ class MasterWebUI( } def addProxyTargets(id: String, target: String): Unit = { - val handler = createProxyHandler("/target/" + id, target) + val handler = createProxyHandler("/proxy/" + id, target) attachHandler(handler) proxyHandlers(id) = handler } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index c462d339645f6..d4d8521cc8204 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -157,7 +157,7 @@ private[deploy] class ExecutorRunner( // Add webUI log urls val baseUrl = if (conf.getBoolean("spark.ui.reverseProxy", false)) { - s"/target/$workerId/logPage/?appId=$appId&executorId=$execId&logType=" + s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType=" } else { s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 341fc3d5c5882..fcccaf2adea1c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -173,7 +173,7 @@ class MasterSuite extends SparkFunSuite workers.size should be (2) workers.foreach { workerSummaryJson => val JString(workerId) = workerSummaryJson \ "id" - val url = s"http://localhost:${localCluster.masterWebUIPort}/target/${workerId}/json" + val url = s"http://localhost:${localCluster.masterWebUIPort}/proxy/${workerId}/json" val workerResponse = parse(Source.fromURL(url) .getLines().mkString("\n")) (workerResponse \ "cores").extract[Int] should be (2) From f649e5d846c613468f0db0e66ac855f5cfcd1604 Mon Sep 17 00:00:00 2001 From: Gurvinder Singh Date: Sat, 13 Aug 2016 20:47:57 +0200 Subject: [PATCH 5/8] fixed syntax and added test for proxyURI --- core/pom.xml | 2 +- .../apache/spark/deploy/master/Master.scala | 16 +++-- .../spark/deploy/master/ui/MasterWebUI.scala | 8 ++- .../org/apache/spark/ui/JettyUtils.scala | 68 +++++++++++-------- .../scala/org/apache/spark/ui/UISuite.scala | 14 ++++ 5 files changed, 73 insertions(+), 35 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index be74457f169f4..175c24d457580 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -399,7 +399,7 @@ true true - guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security + guava,jetty-io,jetty-servlet,jetty-servlets,jetty-continuation,jetty-http,jetty-plus,jetty-util,jetty-server,jetty-security,jetty-proxy,jetty-client true diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 35aca5e585aff..7c3ef98809eb9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -759,7 +759,9 @@ private[deploy] class Master( workers += worker idToWorker(worker.id) = worker addressToWorker(workerAddress) = worker - if (reverseProxy) webUi.addProxyTargets(worker.id, worker.webUiAddress) + if (reverseProxy) { + webUi.addProxyTargets(worker.id, worker.webUiAddress) + } true } @@ -768,7 +770,9 @@ private[deploy] class Master( worker.setState(WorkerState.DEAD) idToWorker -= worker.id addressToWorker -= worker.endpoint.address - if (reverseProxy) webUi.removeProxyTargets(worker.id) + if (reverseProxy) { + webUi.removeProxyTargets(worker.id) + } for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( @@ -816,7 +820,9 @@ private[deploy] class Master( endpointToApp(app.driver) = app addressToApp(appAddress) = app waitingApps += app - if (reverseProxy) webUi.addProxyTargets(app.id, app.desc.appUiUrl) + if (reverseProxy) { + webUi.addProxyTargets(app.id, app.desc.appUiUrl) + } } private def finishApplication(app: ApplicationInfo) { @@ -830,7 +836,9 @@ private[deploy] class Master( idToApp -= app.id endpointToApp -= app.driver addressToApp -= app.driver.address - if (reverseProxy) webUi.removeProxyTargets(app.id) + if (reverseProxy) { + webUi.removeProxyTargets(app.id) + } if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach { a => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 5f7d810b8dc7e..33baccda12b78 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -38,7 +38,7 @@ class MasterWebUI( val masterEndpointRef = master.self val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) - val proxyHandlers = new HashMap[String, ServletContextHandler] + private val proxyHandlers = new HashMap[String, ServletContextHandler] initialize() @@ -55,7 +55,11 @@ class MasterWebUI( } def addProxyTargets(id: String, target: String): Unit = { - val handler = createProxyHandler("/proxy/" + id, target) + var endTarget: String = target + if (endTarget.endsWith("/")) { + endTarget = endTarget.substring(0, endTarget.length()-1) + } + val handler = createProxyHandler("/proxy/" + id, endTarget) attachHandler(handler) proxyHandlers(id) = handler } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 4b3c8cb92857a..a6e565e7055aa 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -194,41 +194,25 @@ private[spark] object JettyUtils extends Logging { target: String): ServletContextHandler = { val servlet = new ProxyServlet { override def rewriteTarget(request: HttpServletRequest): String = { - val path = request.getRequestURI(); - if (!path.startsWith(prefix)) return null - - val uri = new StringBuilder(target) - if (target.endsWith("/")) uri.setLength(uri.length() - 1) - val rest = path.substring(prefix.length()) - if (!rest.isEmpty()) { - if (!rest.startsWith("/")) { - uri.append("/") - } - uri.append(rest) - } - - val query = request.getQueryString() - if (query != null) { - // Is there at least one path segment ? - val separator = "://" - if (uri.indexOf("/", uri.indexOf(separator) + separator.length()) < 0) { - uri.append("/") - } - uri.append("?").append(query) + val rewrittenURI = createProxyURI( + prefix, + target, + request.getRequestURI(), + request.getQueryString()) + if (rewrittenURI == null) { + return null } - val rewrittenURI = URI.create(uri.toString()).normalize() - if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) { return null } - rewrittenURI.toString(); } + override def filterServerResponseHeader( - clientRequest: HttpServletRequest, - serverResponse: Response, - headerName: String, - headerValue: String): String = { + clientRequest: HttpServletRequest, + serverResponse: Response, + headerName: String, + headerValue: String): String = { if (headerName.equalsIgnoreCase("location")) { val targetUri = serverResponse.getRequest().getURI(); val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority(); @@ -394,6 +378,34 @@ private[spark] object JettyUtils extends Logging { redirectHandler } + def createProxyURI (prefix: String, target: String, path: String, query: String): URI = { + if (!path.startsWith(prefix)) { + return null + } + + val uri = new StringBuilder(target) + val rest = path.substring(prefix.length()) + + if (!rest.isEmpty()) { + if (!rest.startsWith("/")) { + uri.append("/") + } + uri.append(rest) + } + + val rewrittenURI = URI.create(uri.toString()) + if (query != null) { + return new URI( + rewrittenURI.getScheme(), + rewrittenURI.getAuthority(), + rewrittenURI.getPath(), + query, + rewrittenURI.getFragment() + ).normalize() + } + rewrittenURI.normalize() + } + // Create a new URI from the arguments, handling IPv6 host encoding and default ports. private def createRedirectURI( scheme: String, server: String, port: Int, path: String, query: String) = { diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 2b59b48d8bc98..e219f2d4232df 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -190,6 +190,20 @@ class UISuite extends SparkFunSuite { } } + test("verify proxy rewrittenURI") { + val prefix = "/proxy/worker-id" + val target = "http://localhost:8081" + val path = "/proxy/worker-id/json" + var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null) + assert(rewrittenURI.toString().equals("http://localhost:8081/json")) + rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done") + assert(rewrittenURI.toString().equals("http://localhost:8081/json?test=done")) + rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id", null) + assert(rewrittenURI.toString().equals("http://localhost:8081")) + rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-noid/json", null) + assert(rewrittenURI == null) + } + def stopServer(info: ServerInfo): Unit = { if (info != null && info.server != null) info.server.stop } From 128c4a651a26998a6822594b88538fee18d87876 Mon Sep 17 00:00:00 2001 From: Gurvinder Singh Date: Tue, 16 Aug 2016 17:00:17 +0200 Subject: [PATCH 6/8] added test for location header and fixed more syntax --- .../deploy/master/ui/ApplicationPage.scala | 13 +++---- .../spark/deploy/master/ui/MasterPage.scala | 25 +++++-------- .../spark/deploy/master/ui/MasterWebUI.scala | 8 ++--- .../org/apache/spark/ui/JettyUtils.scala | 35 ++++++++++++------- .../scala/org/apache/spark/ui/UIUtils.scala | 12 +++++++ .../spark/deploy/master/MasterSuite.scala | 3 +- .../scala/org/apache/spark/ui/UISuite.scala | 31 +++++++++++++--- 7 files changed, 77 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 2c9b790fd25bf..17c521cbf983f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -78,13 +78,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") { if (!app.isFinished) {
  • - { - if (parent.master.reverseProxy) { - Application Detail UI - } else { - Application Detail UI - } - } + Application Detail UI
  • } } @@ -108,8 +103,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") } private def executorRow(executor: ExecutorDesc): Seq[Node] = { - var workerUrlRef = executor.worker.webUiAddress - if (parent.master.reverseProxy) workerUrlRef = "/proxy/" + executor.worker.id + "/" + val workerUrlRef = UIUtils.makeHref(parent.master.reverseProxy, + executor.worker.id, executor.worker.webUiAddress) {executor.id} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index d17c4d04bf274..3fb860582cc17 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -176,13 +176,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { private def workerRow(worker: WorkerInfo): Seq[Node] = { - { - if (parent.master.reverseProxy) { - {worker.id} - } else { - {worker.id} - } - } + {worker.id} {worker.host}:{worker.port} {worker.state} @@ -216,11 +211,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { if (app.isFinished) { app.desc.name } else { - if (parent.master.reverseProxy) { - {app.desc.name} - } else { - {app.desc.name} - } + {app.desc.name} } } @@ -255,11 +247,10 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { {driver.id} {killLink} {driver.submitDate} {driver.worker.map(w => - if (parent.master.reverseProxy) { - {w.id.toString} - } else { - {w.id.toString} - }).getOrElse("None")} + + {w.id.toString} + ).getOrElse("None")} {driver.state} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 33baccda12b78..8cfd0f682932d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -55,18 +55,14 @@ class MasterWebUI( } def addProxyTargets(id: String, target: String): Unit = { - var endTarget: String = target - if (endTarget.endsWith("/")) { - endTarget = endTarget.substring(0, endTarget.length()-1) - } + var endTarget = target.stripSuffix("/") val handler = createProxyHandler("/proxy/" + id, endTarget) attachHandler(handler) proxyHandlers(id) = handler } def removeProxyTargets(id: String): Unit = { - detachHandler(proxyHandlers.get(id).get) - proxyHandlers -= id + proxyHandlers.remove(id).foreach(detachHandler) } } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a6e565e7055aa..24f3f757157f3 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -195,17 +195,14 @@ private[spark] object JettyUtils extends Logging { val servlet = new ProxyServlet { override def rewriteTarget(request: HttpServletRequest): String = { val rewrittenURI = createProxyURI( - prefix, - target, - request.getRequestURI(), - request.getQueryString()) + prefix, target, request.getRequestURI(), request.getQueryString()) if (rewrittenURI == null) { return null } if (!validateDestination(rewrittenURI.getHost(), rewrittenURI.getPort())) { return null } - rewrittenURI.toString(); + rewrittenURI.toString() } override def filterServerResponseHeader( @@ -214,14 +211,14 @@ private[spark] object JettyUtils extends Logging { headerName: String, headerValue: String): String = { if (headerName.equalsIgnoreCase("location")) { - val targetUri = serverResponse.getRequest().getURI(); - val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority(); - if (headerValue.startsWith(toReplace)) { - return clientRequest.getScheme() + "://" + clientRequest.getHeader("host") + - prefix + headerValue.substring(toReplace.length()) + val newHeader = createProxyLocationHeader( + prefix, headerValue, clientRequest, serverResponse.getRequest().getURI()) + if (newHeader != null) { + return newHeader } } - super.filterServerResponseHeader(clientRequest, serverResponse, headerName, headerValue); + super.filterServerResponseHeader( + clientRequest, serverResponse, headerName, headerValue) } } @@ -378,7 +375,7 @@ private[spark] object JettyUtils extends Logging { redirectHandler } - def createProxyURI (prefix: String, target: String, path: String, query: String): URI = { + def createProxyURI(prefix: String, target: String, path: String, query: String): URI = { if (!path.startsWith(prefix)) { return null } @@ -406,6 +403,20 @@ private[spark] object JettyUtils extends Logging { rewrittenURI.normalize() } + def createProxyLocationHeader( + prefix: String, + headerValue: String, + clientRequest: HttpServletRequest, + targetUri: URI): String = { + val toReplace = targetUri.getScheme() + "://" + targetUri.getAuthority() + if (headerValue.startsWith(toReplace)) { + clientRequest.getScheme() + "://" + clientRequest.getHeader("host") + + prefix + headerValue.substring(toReplace.length()) + } else { + null + } + } + // Create a new URI from the arguments, handling IPv6 host encoding and default ports. private def createRedirectURI( scheme: String, server: String, port: Int, path: String, query: String) = { diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 2b6c538485c51..c0d1a2220f62a 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -510,4 +510,16 @@ private[spark] object UIUtils extends Logging { def getTimeZoneOffset() : Int = TimeZone.getDefault().getOffset(System.currentTimeMillis()) / 1000 / 60 + + /** + * Return the correct Href after checking if master is running in the + * reverse proxy mode or not. + */ + def makeHref(proxy: Boolean, id: String, origHref: String): String = { + if (proxy) { + s"/proxy/$id" + } else { + origHref + } + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index fcccaf2adea1c..831a7bcb12743 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -174,8 +174,7 @@ class MasterSuite extends SparkFunSuite workers.foreach { workerSummaryJson => val JString(workerId) = workerSummaryJson \ "id" val url = s"http://localhost:${localCluster.masterWebUIPort}/proxy/${workerId}/json" - val workerResponse = parse(Source.fromURL(url) - .getLines().mkString("\n")) + val workerResponse = parse(Source.fromURL(url).getLines().mkString("\n")) (workerResponse \ "cores").extract[Int] should be (2) (workerResponse \ "masterwebuiurl").extract[String] should be (reverseProxyUrl) } diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index e219f2d4232df..dbb8dca4c8dab 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -18,10 +18,13 @@ package org.apache.spark.ui import java.net.{BindException, ServerSocket} +import java.net.URI +import javax.servlet.http.HttpServletRequest import scala.io.Source import org.eclipse.jetty.servlet.ServletContextHandler +import org.mockito.Mockito.{mock, when} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -195,13 +198,33 @@ class UISuite extends SparkFunSuite { val target = "http://localhost:8081" val path = "/proxy/worker-id/json" var rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, null) - assert(rewrittenURI.toString().equals("http://localhost:8081/json")) + assert(rewrittenURI.toString() === "http://localhost:8081/json") rewrittenURI = JettyUtils.createProxyURI(prefix, target, path, "test=done") - assert(rewrittenURI.toString().equals("http://localhost:8081/json?test=done")) + assert(rewrittenURI.toString() === "http://localhost:8081/json?test=done") rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id", null) - assert(rewrittenURI.toString().equals("http://localhost:8081")) + assert(rewrittenURI.toString() === "http://localhost:8081") + rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/test%2F", null) + assert(rewrittenURI.toString() === "http://localhost:8081/test%2F") + rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-id/%F0%9F%98%84", null) + assert(rewrittenURI.toString() === "http://localhost:8081/%F0%9F%98%84") rewrittenURI = JettyUtils.createProxyURI(prefix, target, "/proxy/worker-noid/json", null) - assert(rewrittenURI == null) + assert(rewrittenURI === null) + } + + test("verify rewriting location header for reverse proxy") { + val clientRequest = mock(classOf[HttpServletRequest]) + var headerValue = "http://localhost:4040/jobs" + val prefix = "/proxy/worker-id" + val targetUri = URI.create("http://localhost:4040") + when(clientRequest.getScheme()).thenReturn("http") + when(clientRequest.getHeader("host")).thenReturn("localhost:8080") + var newHeader = JettyUtils.createProxyLocationHeader( + prefix, headerValue, clientRequest, targetUri) + assert(newHeader.toString() === "http://localhost:8080/proxy/worker-id/jobs") + headerValue = "http://localhost:4041/jobs" + newHeader = JettyUtils.createProxyLocationHeader( + prefix, headerValue, clientRequest, targetUri) + assert(newHeader === null) } def stopServer(info: ServerInfo): Unit = { From 0fd44777a2cf935117234adccc6811b90d8bcdb8 Mon Sep 17 00:00:00 2001 From: Gurvinder Singh Date: Tue, 16 Aug 2016 20:20:41 +0200 Subject: [PATCH 7/8] added docs with warning about reverse proxy being global to cluster and restricting direct access to application UI --- docs/configuration.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 655f980b5cbbb..182b533ccaec5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -638,14 +638,14 @@ Apart from these, the following properties are also available, and may be useful spark.ui.reverseProxy false - To enable running Spark Master, worker and application UI behined a reverse proxy. In this mode, Spark master will reverse proxy the worker and application UIs to enable access. + Enable running Spark Master as reverse proxy for worker and application UIs. In this mode, Spark master will reverse proxy the worker and application UIs to enable access without requiring direct access to their hosts. Use it with caution, as worker and application UI will not be accessible directly, you will only be able to access them through spark master/proxy public URL. This setting affects all the workers and application UIs running in the cluster and must be set on all the workers, drivers and masters. spark.ui.reverseProxyUrl - http://localhost:8080 + - This is the URL where your proxy is running. Make sure this is a complete URL including scheme (http/https) and port to reach your proxy. + This is the URL where your proxy is running. This URL is for proxy which is running in front of Spark Master. This is useful when running proxy for authentication e.g. OAuth proxy. Make sure this is a complete URL including scheme (http/https) and port to reach your proxy. From 9f6862ed579cf4edc965e1a9835a79d6aa4c6d0b Mon Sep 17 00:00:00 2001 From: Gurvinder Singh Date: Wed, 17 Aug 2016 20:52:19 +0200 Subject: [PATCH 8/8] print information about accessing UIs when running in proxy mode --- .../org/apache/spark/deploy/master/Master.scala | 2 ++ .../org/apache/spark/deploy/worker/Worker.scala | 3 +++ .../org/apache/spark/repl/SparkILoopInit.scala | 13 ++++++++++++- .../scala/org/apache/spark/repl/SparkILoop.scala | 13 ++++++++++++- 4 files changed, 29 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7c3ef98809eb9..cb3d37404d7ca 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -132,6 +132,8 @@ private[deploy] class Master( masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort if (reverseProxy) { masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl) + logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " + + s"Applications UIs are available at $masterWebUiUrl") } checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 724206bf94c68..0bedd9a20a969 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -203,6 +203,9 @@ private[deploy] class Worker( activeMasterWebUiUrl = uiUrl master = Some(masterRef) connected = true + if (conf.getBoolean("spark.ui.reverseProxy", false)) { + logInfo(s"WorkerWebUI is available at $activeMasterWebUiUrl/proxy/$workerId") + } // Cancel any outstanding re-registration attempts because we found a new master cancelLastRegistrationRetry() } diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala index 29f63de8a0fa1..b2a61260c2bb6 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -126,7 +126,18 @@ private[repl] trait SparkILoopInit { @transient val spark = org.apache.spark.repl.Main.interp.createSparkSession() @transient val sc = { val _sc = spark.sparkContext - _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) + if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { + val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) + if (proxyUrl != null) { + println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") + } else { + println(s"Spark Context Web UI is available at Spark Master Public URL") + } + } else { + _sc.uiWebUrl.foreach { + webUrl => println(s"Spark context Web UI available at ${webUrl}") + } + } println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") println("Spark session available as 'spark'.") diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 2707b0847aefc..76a66c1beada0 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -43,7 +43,18 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) } @transient val sc = { val _sc = spark.sparkContext - _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) + if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { + val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) + if (proxyUrl != null) { + println(s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") + } else { + println(s"Spark Context Web UI is available at Spark Master Public URL") + } + } else { + _sc.uiWebUrl.foreach { + webUrl => println(s"Spark context Web UI available at ${webUrl}") + } + } println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") println("Spark session available as 'spark'.")