From 258417c124f94c4ee104843585b3dc2b4438d215 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Wed, 5 Nov 2014 18:38:04 -0800 Subject: [PATCH 1/4] [SPARK-4277] Support external shuffle service on executor --- .../org/apache/spark/SecurityManager.scala | 14 +--- .../apache/spark/deploy/worker/Worker.scala | 8 ++- .../deploy/worker/WorkerShuffleService.scala | 64 +++++++++++++++++++ .../storage/ShuffleBlockFetcherIterator.scala | 2 +- .../spark/network/sasl/SaslMessage.java | 3 +- 5 files changed, 77 insertions(+), 14 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index dee935ffad51f..dbff9d12b5ad7 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -343,15 +343,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with */ def getSecretKey(): String = secretKey - override def getSaslUser(appId: String): String = { - val myAppId = sparkConf.getAppId - require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}") - getSaslUser() - } - - override def getSecretKey(appId: String): String = { - val myAppId = sparkConf.getAppId - require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}") - getSecretKey() - } + // Default SecurityManager only has a single secret key, so ignore appId. + override def getSaslUser(appId: String): String = getSaslUser() + override def getSecretKey(appId: String): String = getSecretKey() } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f1f66d0903f1c..a954ac153242e 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -111,6 +111,9 @@ private[spark] class Worker( val drivers = new HashMap[String, DriverRunner] val finishedDrivers = new HashMap[String, DriverRunner] + // The shuffle service is not actually started unless configured. + var shuffleService = new WorkerShuffleService(conf, securityMgr) + val publicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host @@ -154,6 +157,7 @@ private[spark] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + shuffleService.startIfEnabled() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() registerWithMaster() @@ -419,6 +423,7 @@ private[spark] class Worker( registrationRetryTimer.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) + shuffleService.stop() webUi.stop() metricsSystem.stop() } @@ -441,7 +446,8 @@ private[spark] object Worker extends Logging { cores: Int, memory: Int, masterUrls: Array[String], - workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + workDir: String, + workerNumber: Option[Int] = None): (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val conf = new SparkConf diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala new file mode 100644 index 0000000000000..92beb70504ab4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.worker + +import org.apache.spark.{Logging, SparkConf, SecurityManager} +import org.apache.spark.network.TransportContext +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.sasl.SaslRpcHandler +import org.apache.spark.network.server.TransportServer +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler + +/** + * Provides a server from which Executors can read shuffle files (rather than reading directly from + * each other), to provide uninterrupted access to the files in the face of executors being turned + * off or killed. + * + * Optionally requires SASL authentication in order to read. See [[SecurityManager]]. + */ +class WorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager) extends Logging { + + private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false) + private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) + private val useSasl: Boolean = securityManager.isAuthenticationEnabled() + + private val transportConf = SparkTransportConf.fromSparkConf(sparkConf) + private val blockHandler = new ExternalShuffleBlockHandler() + private val transportContext: TransportContext = { + val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler + new TransportContext(transportConf, handler) + } + + private var server: TransportServer = _ + + /** Starts the external shuffle service if the user has configured us to. */ + def startIfEnabled() { + if (enabled) { + require(server == null, "Shuffle server already started") + logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl") + server = transportContext.createServer(port) + } + } + + def stop() { + if (enabled && server != null) { + server.close() + server = null + } + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 1e579187e4193..6b1f57a069431 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -92,7 +92,7 @@ final class ShuffleBlockFetcherIterator( * Current [[FetchResult]] being processed. We track this so we can release the current buffer * in case of a runtime exception when processing the current buffer. */ - private[this] var currentResult: FetchResult = null + @volatile private[this] var currentResult: FetchResult = null /** * Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java index 5b77e18c26bf4..599cc6428c90e 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java @@ -58,7 +58,8 @@ public void encode(ByteBuf buf) { public static SaslMessage decode(ByteBuf buf) { if (buf.readByte() != TAG_BYTE) { - throw new IllegalStateException("Expected SaslMessage, received something else"); + throw new IllegalStateException("Expected SaslMessage, received something else" + + " (maybe your client does not have SASL enabled?)"); } int idLength = buf.readInt(); From 47f49d3e35bc1203dd1b04c4365a7c9005501a21 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 6 Nov 2014 11:47:55 -0800 Subject: [PATCH 2/4] NettyBlockTransferService shouldn't care about app ids (it's only b/t executors) --- .../netty/NettyBlockTransferSecuritySuite.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index bed0ed9d713dd..9162ec9801663 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -89,18 +89,6 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh } } - test("security mismatch app ids") { - val conf0 = new SparkConf() - .set("spark.authenticate", "true") - .set("spark.authenticate.secret", "good") - .set("spark.app.id", "app-id") - val conf1 = conf0.clone.set("spark.app.id", "other-id") - testConnection(conf0, conf1) match { - case Success(_) => fail("Should have failed") - case Failure(t) => t.getMessage should include ("SASL appId app-id did not match") - } - } - /** * Creates two servers with different configurations and sees if they can talk. * Returns Success() if they can transfer a block, and Failure() if the block transfer was failed From 2dcdfc1444fa511e6be714288b80242402a78e19 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 6 Nov 2014 12:11:50 -0800 Subject: [PATCH 3/4] Add private[worker] --- .../org/apache/spark/deploy/worker/WorkerShuffleService.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala index 92beb70504ab4..c9b9a1394bfd7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala @@ -31,6 +31,7 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler * * Optionally requires SASL authentication in order to read. See [[SecurityManager]]. */ +private[worker] class WorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager) extends Logging { private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false) From 3780bd704adfc738ea70bd6d311a2c824e3849c6 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 6 Nov 2014 15:26:28 -0800 Subject: [PATCH 4/4] Address comments --- ...uffleService.scala => StandaloneWorkerShuffleService.scala} | 3 ++- .../src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) rename core/src/main/scala/org/apache/spark/deploy/worker/{WorkerShuffleService.scala => StandaloneWorkerShuffleService.scala} (95%) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala similarity index 95% rename from core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala rename to core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index c9b9a1394bfd7..88118e2837741 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -32,7 +32,8 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler * Optionally requires SASL authentication in order to read. See [[SecurityManager]]. */ private[worker] -class WorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager) extends Logging { +class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager) + extends Logging { private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false) private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index a954ac153242e..ca262de832e25 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -112,7 +112,7 @@ private[spark] class Worker( val finishedDrivers = new HashMap[String, DriverRunner] // The shuffle service is not actually started unless configured. - var shuffleService = new WorkerShuffleService(conf, securityMgr) + val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr) val publicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS")