From 1118c8f7020aae7af8ef2c38f68ad2d672f912a2 Mon Sep 17 00:00:00 2001 From: Bertrand Bossy Date: Fri, 19 Feb 2016 14:59:34 +0100 Subject: [PATCH 1/5] SPARK-12583: Heartbeat from MesosExternalShuffleClient to shuffle service --- .../mesos/MesosExternalShuffleClient.java | 72 ++++++++++++--- .../protocol/BlockTransferMessage.java | 5 +- .../protocol/mesos/RegisterDriver.java | 14 ++- .../mesos/MesosExternalShuffleService.scala | 90 ++++++++++++------- .../mesos/CoarseMesosSchedulerBackend.scala | 7 +- .../CoarseMesosSchedulerBackendSuite.scala | 3 +- .../mesos/ShuffleServiceHeartbeat.java | 36 ++++++++ 7 files changed, 174 insertions(+), 53 deletions(-) create mode 100644 network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 675820308bd4c..1374a0ee57211 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -19,7 +19,12 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +46,13 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient { private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class); + private final ScheduledExecutorService heartbeaterThread = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("mesos-external-shuffle-client-heartbeater") + .build()); + /** * Creates an Mesos external shuffle client that wraps the {@link ExternalShuffleClient}. * Please refer to docs on {@link ExternalShuffleClient} for more information. @@ -53,21 +65,55 @@ public MesosExternalShuffleClient( super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled); } - public void registerDriverWithShuffleService(String host, int port) throws IOException { + public void registerDriverWithShuffleService(String host, int port, long heartbeatTimeoutMs) + throws IOException { + checkInit(); - ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer(); + ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer(); TransportClient client = clientFactory.createClient(host, port); - client.sendRpc(registerDriver, new RpcResponseCallback() { - @Override - public void onSuccess(ByteBuffer response) { - logger.info("Successfully registered app " + appId + " with external shuffle service."); - } - - @Override - public void onFailure(Throwable e) { - logger.warn("Unable to register app " + appId + " with external shuffle service. " + + client.sendRpc(registerDriver, new RegisterDriverCallback(client, heartbeatTimeoutMs)); + } + + private class RegisterDriverCallback implements RpcResponseCallback { + private final TransportClient client; + private final long heartbeatTimeoutMs; + + private RegisterDriverCallback(TransportClient client, long heartbeatTimeoutMs) { + this.client = client; + this.heartbeatTimeoutMs = heartbeatTimeoutMs; + } + + @Override + public void onSuccess(ByteBuffer response) { + heartbeaterThread.scheduleAtFixedRate( + new Heartbeater(client), 0, heartbeatTimeoutMs / 4, TimeUnit.MILLISECONDS); + logger.info("Successfully registered app " + appId + " with external shuffle service."); + } + + @Override + public void onFailure(Throwable e) { + logger.warn("Unable to register app " + appId + " with external shuffle service. " + "Please manually remove shuffle data after driver exit. Error: " + e); - } - }); + } + } + + @Override + public void close() { + heartbeaterThread.shutdownNow(); + super.close(); + } + + private class Heartbeater implements Runnable { + + private final TransportClient client; + + private Heartbeater(TransportClient client) { + this.client = client; + } + + @Override + public void run() { + client.send(new ShuffleServiceHeartbeat(appId).toByteBuffer()); + } } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index 7fbe3384b4d4f..21c0ff4136aa8 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -24,6 +24,7 @@ import org.apache.spark.network.protocol.Encodable; import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver; +import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat; /** * Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or @@ -40,7 +41,8 @@ public abstract class BlockTransferMessage implements Encodable { /** Preceding every serialized message is its type, which allows us to deserialize it. */ public static enum Type { - OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4); + OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4), + HEARTBEAT(5); private final byte id; @@ -64,6 +66,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) { case 2: return RegisterExecutor.decode(buf); case 3: return StreamHandle.decode(buf); case 4: return RegisterDriver.decode(buf); + case 5: return ShuffleServiceHeartbeat.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java index eeb0019411628..d5f53ccb7f741 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java @@ -31,29 +31,34 @@ */ public class RegisterDriver extends BlockTransferMessage { private final String appId; + private final long heartbeatTimeoutMs; - public RegisterDriver(String appId) { + public RegisterDriver(String appId, long heartbeatTimeoutMs) { this.appId = appId; + this.heartbeatTimeoutMs = heartbeatTimeoutMs; } public String getAppId() { return appId; } + public long getHeartbeatTimeoutMs() { return heartbeatTimeoutMs; } + @Override protected Type type() { return Type.REGISTER_DRIVER; } @Override public int encodedLength() { - return Encoders.Strings.encodedLength(appId); + return Encoders.Strings.encodedLength(appId) + Long.SIZE / Byte.SIZE; } @Override public void encode(ByteBuf buf) { Encoders.Strings.encode(buf, appId); + buf.writeLong(heartbeatTimeoutMs); } @Override public int hashCode() { - return Objects.hashCode(appId); + return Objects.hashCode(appId, heartbeatTimeoutMs); } @Override @@ -66,6 +71,7 @@ public boolean equals(Object o) { public static RegisterDriver decode(ByteBuf buf) { String appId = Encoders.Strings.decode(buf); - return new RegisterDriver(appId); + long heartbeatTimeout = buf.readLong(); + return new RegisterDriver(appId, heartbeatTimeout); } } diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala index 4172d924c802d..c57323affadb9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -17,69 +17,92 @@ package org.apache.spark.deploy.mesos -import java.net.SocketAddress import java.nio.ByteBuffer +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} -import scala.collection.mutable +import scala.collection.JavaConverters._ import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.ExternalShuffleService import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.shuffle.protocol.BlockTransferMessage -import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver +import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat} import org.apache.spark.network.util.TransportConf +import org.apache.spark.util.ThreadUtils /** * An RPC endpoint that receives registration requests from Spark drivers running on Mesos. * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. */ -private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf) +private[mesos] class MesosExternalShuffleBlockHandler( + transportConf: TransportConf, cleanerIntervalS: Long) extends ExternalShuffleBlockHandler(transportConf, null) with Logging { - // Stores a map of driver socket addresses to app ids - private val connectedApps = new mutable.HashMap[SocketAddress, String] + ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher") + .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS) + + // Stores a map of app id to app state (timeout value and last heartbeat) + private val connectedApps = new ConcurrentHashMap[String, AppState]() protected override def handleMessage( message: BlockTransferMessage, client: TransportClient, callback: RpcResponseCallback): Unit = { message match { - case RegisterDriverParam(appId) => + case RegisterDriverParam(appId, appState) => val address = client.getSocketAddress - logDebug(s"Received registration request from app $appId (remote address $address).") - if (connectedApps.contains(address)) { - val existingAppId = connectedApps(address) - if (!existingAppId.equals(appId)) { - logError(s"A new app '$appId' has connected to existing address $address, " + - s"removing previously registered app '$existingAppId'.") - applicationRemoved(existingAppId, true) - } + val timeout = appState.heartbeatTimeout + logInfo(s"Received registration request from app $appId (remote address $address, " + + s"heartbeat timeout $timeout ms).") + if (connectedApps.containsKey(appId)) { + logWarning(s"Received a registration request from app $appId, but it was already " + + s"registered") } - connectedApps(address) = appId + connectedApps.put(appId, appState) callback.onSuccess(ByteBuffer.allocate(0)) + case Heartbeat(appId) => + val address = client.getSocketAddress + Option(connectedApps.get(appId)) match { + case Some(existingAppState) => + logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " + + s"address $address).") + existingAppState.lastHeartbeat = System.nanoTime() + case None => + logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " + + s"address $address, appId '$appId').") + } case _ => super.handleMessage(message, client, callback) } } - /** - * On connection termination, clean up shuffle files written by the associated application. - */ - override def channelInactive(client: TransportClient): Unit = { - val address = client.getSocketAddress - if (connectedApps.contains(address)) { - val appId = connectedApps(address) - logInfo(s"Application $appId disconnected (address was $address).") - applicationRemoved(appId, true /* cleanupLocalDirs */) - connectedApps.remove(address) - } else { - logWarning(s"Unknown $address disconnected.") - } - } - /** An extractor object for matching [[RegisterDriver]] message. */ private object RegisterDriverParam { - def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId) + def unapply(r: RegisterDriver): Option[(String, AppState)] = + Some((r.getAppId, AppState(r.getHeartbeatTimeoutMs, System.nanoTime()))) + } + + private object Heartbeat { + def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId) + } + + private case class AppState(heartbeatTimeout: Long, var lastHeartbeat: Long) + + private class CleanerThread extends Runnable { + override def run(): Unit = { + val now = System.nanoTime() + val appsToRemove = connectedApps.asScala.filter{ case (address, appState) => + now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000 + } + appsToRemove.foreach { case (appId, appState) => + val nanosSinceLastHeartbeat = System.nanoTime() - appState.lastHeartbeat + if (nanosSinceLastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) { + logInfo(s"Application $appId timed out. Removing shuffle files.") + connectedApps.remove(appId) + applicationRemoved(appId, true) + } + } + } } } @@ -93,7 +116,8 @@ private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManage protected override def newShuffleBlockHandler( conf: TransportConf): ExternalShuffleBlockHandler = { - new MesosExternalShuffleBlockHandler(conf) + val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleanerInterval", "30s") + new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index e1180980eed68..8a42fdaeab150 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -448,7 +448,9 @@ private[spark] class CoarseMesosSchedulerBackend( s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}") mesosExternalShuffleClient.get - .registerDriverWithShuffleService(slave.hostname, externalShufflePort) + .registerDriverWithShuffleService(slave.hostname, externalShufflePort, + sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", + s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms")) slave.shuffleRegistered = true } @@ -506,6 +508,9 @@ private[spark] class CoarseMesosSchedulerBackend( + "on the mesos nodes.") } + // Close the mesos external shuffle client if used + mesosExternalShuffleClient.foreach(_.close()) + if (mesosDriver != null) { mesosDriver.stop() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index dd76644288b4c..beb3bb1a2f663 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -192,7 +192,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING) backend.statusUpdate(driver, status2) - verify(externalShuffleClient, times(1)).registerDriverWithShuffleService(anyString, anyInt) + verify(externalShuffleClient, times(1)) + .registerDriverWithShuffleService(anyString, anyInt, anyLong) } test("mesos kills an executor when told") { diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java new file mode 100644 index 0000000000000..73713c117e237 --- /dev/null +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java @@ -0,0 +1,36 @@ +package org.apache.spark.network.shuffle.protocol.mesos; + +import io.netty.buffer.ByteBuf; +import org.apache.spark.network.protocol.Encoders; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; + +// Needed by ScalaDoc. See SPARK-7726 +import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; + +/** + * A heartbeat sent from the driver to the MesosExternalShuffleService. + */ +public class ShuffleServiceHeartbeat extends BlockTransferMessage { + private final String appId; + + public ShuffleServiceHeartbeat(String appId) { + this.appId = appId; + } + + public String getAppId() { return appId; } + + @Override + protected Type type() { return Type.HEARTBEAT; } + + @Override + public int encodedLength() { return Encoders.Strings.encodedLength(appId); } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, appId); + } + + public static ShuffleServiceHeartbeat decode(ByteBuf buf) { + return new ShuffleServiceHeartbeat(Encoders.Strings.decode(buf)); + } +} From b33c227fdb68ad0d9d22d7549f4ac17bde321fa3 Mon Sep 17 00:00:00 2001 From: Bertrand Bossy Date: Fri, 19 Feb 2016 19:15:56 +0100 Subject: [PATCH 2/5] SPARK-12583: Add missing license header --- .../protocol/mesos/ShuffleServiceHeartbeat.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java index 73713c117e237..b30bb9aed55b6 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.network.shuffle.protocol.mesos; import io.netty.buffer.ByteBuf; From c632c6c352a27bfb025c57862a775e7091e17fc1 Mon Sep 17 00:00:00 2001 From: Bertrand Bossy Date: Mon, 22 Feb 2016 19:25:56 +0100 Subject: [PATCH 3/5] SPARK-12583: Addressing comments --- .../mesos/MesosExternalShuffleClient.java | 17 ++++++++++------- .../mesos/MesosExternalShuffleService.scala | 12 ++++-------- .../mesos/CoarseMesosSchedulerBackend.scala | 7 +++++-- .../CoarseMesosSchedulerBackendSuite.scala | 2 +- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 1374a0ee57211..9aac0660a92bb 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -65,28 +65,31 @@ public MesosExternalShuffleClient( super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled); } - public void registerDriverWithShuffleService(String host, int port, long heartbeatTimeoutMs) - throws IOException { + public void registerDriverWithShuffleService( + String host, + int port, + long heartbeatTimeoutMs, + long heartbeatIntervalMs) throws IOException { checkInit(); ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer(); TransportClient client = clientFactory.createClient(host, port); - client.sendRpc(registerDriver, new RegisterDriverCallback(client, heartbeatTimeoutMs)); + client.sendRpc(registerDriver, new RegisterDriverCallback(client, heartbeatIntervalMs)); } private class RegisterDriverCallback implements RpcResponseCallback { private final TransportClient client; - private final long heartbeatTimeoutMs; + private final long heartbeatIntervalMs; - private RegisterDriverCallback(TransportClient client, long heartbeatTimeoutMs) { + private RegisterDriverCallback(TransportClient client, long heartbeatIntervalMs) { this.client = client; - this.heartbeatTimeoutMs = heartbeatTimeoutMs; + this.heartbeatIntervalMs = heartbeatIntervalMs; } @Override public void onSuccess(ByteBuffer response) { heartbeaterThread.scheduleAtFixedRate( - new Heartbeater(client), 0, heartbeatTimeoutMs / 4, TimeUnit.MILLISECONDS); + new Heartbeater(client), 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS); logger.info("Successfully registered app " + appId + " with external shuffle service."); } diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala index c57323affadb9..e78b90c3d7fa0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -79,24 +79,20 @@ private[mesos] class MesosExternalShuffleBlockHandler( /** An extractor object for matching [[RegisterDriver]] message. */ private object RegisterDriverParam { def unapply(r: RegisterDriver): Option[(String, AppState)] = - Some((r.getAppId, AppState(r.getHeartbeatTimeoutMs, System.nanoTime()))) + Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime()))) } private object Heartbeat { def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId) } - private case class AppState(heartbeatTimeout: Long, var lastHeartbeat: Long) + private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long) private class CleanerThread extends Runnable { override def run(): Unit = { val now = System.nanoTime() - val appsToRemove = connectedApps.asScala.filter{ case (address, appState) => - now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000 - } - appsToRemove.foreach { case (appId, appState) => - val nanosSinceLastHeartbeat = System.nanoTime() - appState.lastHeartbeat - if (nanosSinceLastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) { + connectedApps.asScala.foreach { case (appId, appState) => + if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) { logInfo(s"Application $appId timed out. Removing shuffle files.") connectedApps.remove(appId) applicationRemoved(appId, true) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 8a42fdaeab150..90b1813750be7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -448,9 +448,12 @@ private[spark] class CoarseMesosSchedulerBackend( s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}") mesosExternalShuffleClient.get - .registerDriverWithShuffleService(slave.hostname, externalShufflePort, + .registerDriverWithShuffleService( + slave.hostname, + externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms")) + s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"), + sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) slave.shuffleRegistered = true } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index beb3bb1a2f663..b18f0eb162b1d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -193,7 +193,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING) backend.statusUpdate(driver, status2) verify(externalShuffleClient, times(1)) - .registerDriverWithShuffleService(anyString, anyInt, anyLong) + .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong) } test("mesos kills an executor when told") { From 2f6c0e68640e7d9ce1b9cd882d9f90041706c85e Mon Sep 17 00:00:00 2001 From: Bertrand Bossy Date: Tue, 23 Feb 2016 07:25:30 +0100 Subject: [PATCH 4/5] SPARK-12583: Address comment: style --- .../spark/deploy/mesos/MesosExternalShuffleService.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala index e78b90c3d7fa0..69455e8cf1511 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -36,7 +36,8 @@ import org.apache.spark.util.ThreadUtils * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. */ private[mesos] class MesosExternalShuffleBlockHandler( - transportConf: TransportConf, cleanerIntervalS: Long) + transportConf: TransportConf, + cleanerIntervalS: Long) extends ExternalShuffleBlockHandler(transportConf, null) with Logging { ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher") From 3a9914b238d7059982c44359223b7aea2d743107 Mon Sep 17 00:00:00 2001 From: Bertrand Bossy Date: Sat, 12 Mar 2016 13:33:06 +0100 Subject: [PATCH 5/5] SPARK-12583: Address comments & rebase --- .../spark/network/shuffle/mesos/MesosExternalShuffleClient.java | 1 + .../network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java | 0 .../apache/spark/deploy/mesos/MesosExternalShuffleService.scala | 2 +- 3 files changed, 2 insertions(+), 1 deletion(-) rename {network/shuffle => common/network-shuffle}/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java (100%) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 9aac0660a92bb..2add9c83a73d2 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -116,6 +116,7 @@ private Heartbeater(TransportClient client) { @Override public void run() { + // TODO: Stop sending heartbeats if the shuffle service has lost the app due to timeout client.send(new ShuffleServiceHeartbeat(appId).toByteBuffer()); } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java similarity index 100% rename from network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java rename to common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala index 69455e8cf1511..c0f9129a423f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -113,7 +113,7 @@ private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManage protected override def newShuffleBlockHandler( conf: TransportConf): ExternalShuffleBlockHandler = { - val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleanerInterval", "30s") + val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s") new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS) } }