From 50b1d05614b1c783997ca3c64736798d25296927 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 24 Dec 2019 22:58:41 +0900 Subject: [PATCH 1/8] [SPARK-30348][CORE] Fix flaky test failure on "MasterSuite.SPARK-27510: Master should avoid ..." --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 3 ++- .../scala/org/apache/spark/deploy/master/MasterSuite.scala | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 8d3795cae707a..a2854c7072ad5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -75,7 +75,8 @@ private[deploy] class Master( private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo] private val addressToApp = new HashMap[RpcAddress, ApplicationInfo] - private val completedApps = new ArrayBuffer[ApplicationInfo] + // Expose for testing + private[master] val completedApps = new ArrayBuffer[ApplicationInfo] private var nextAppNumber = 0 private val drivers = new HashSet[DriverInfo] diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 69bcda82f7081..00b2d107223a6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -682,7 +682,11 @@ class MasterSuite extends SparkFunSuite // an app would be registered with Master once Driver set up assert(worker.apps.nonEmpty) appId = worker.apps.head._1 - assert(master.idToApp.contains(appId)) + + // we found the case where the test was too fast which all steps were done within + // an interval - in this case, we have to check either app is available in master + // or marked as completed. See SPARK-30348 for details. + assert(master.idToApp.contains(appId) || master.completedApps.exists(_.id == appId)) } eventually(timeout(10.seconds)) { From ccca5f572d54da8d19fa131f5c2298f14973eb82 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 25 Dec 2019 08:05:54 +0900 Subject: [PATCH 2/8] Avoid exposing completedApps via leveraging PrivateMethodTester --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 3 +-- .../scala/org/apache/spark/deploy/master/MasterSuite.scala | 6 ++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a2854c7072ad5..8d3795cae707a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -75,8 +75,7 @@ private[deploy] class Master( private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo] private val addressToApp = new HashMap[RpcAddress, ApplicationInfo] - // Expose for testing - private[master] val completedApps = new ArrayBuffer[ApplicationInfo] + private val completedApps = new ArrayBuffer[ApplicationInfo] private var nextAppNumber = 0 private val drivers = new HashSet[DriverInfo] diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 00b2d107223a6..cd0d9cf270436 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.{HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.duration._ import scala.io.Source import scala.reflect.ClassTag @@ -546,6 +546,7 @@ class MasterSuite extends SparkFunSuite PrivateMethod[Array[Int]](Symbol("scheduleExecutorsOnWorkers")) private val _drivers = PrivateMethod[HashSet[DriverInfo]](Symbol("drivers")) private val _state = PrivateMethod[RecoveryState.Value](Symbol("state")) + private val _completedApps = PrivateMethod[ArrayBuffer[ApplicationInfo]](Symbol("completedApps")) private val workerInfo = makeWorkerInfo(4096, 10) private val workerInfos = Array(workerInfo, workerInfo, workerInfo) @@ -686,7 +687,8 @@ class MasterSuite extends SparkFunSuite // we found the case where the test was too fast which all steps were done within // an interval - in this case, we have to check either app is available in master // or marked as completed. See SPARK-30348 for details. - assert(master.idToApp.contains(appId) || master.completedApps.exists(_.id == appId)) + val completedApps = master.invokePrivate(_completedApps()) + assert(master.idToApp.contains(appId) || completedApps.exists(_.id == appId)) } eventually(timeout(10.seconds)) { From 0af3a598f64cdc9e57828070989ca110a74c735e Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 25 Dec 2019 21:55:39 +0900 Subject: [PATCH 3/8] Add CountDownLatch to try to verify all steps sequentially --- .../spark/deploy/master/MasterSuite.scala | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index cd0d9cf270436..bde34169bcdb4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.master import java.util.Date -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ @@ -97,15 +97,29 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend } } -class MockExecutorLaunchFailWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) - extends MockWorker(master, conf) { +class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkConf) + extends MockWorker(master.self, conf) { + + val launchExecutorReceived = new CountDownLatch(1) var failedCnt = 0 override def receive: PartialFunction[Any, Unit] = { case LaunchExecutor(_, appId, execId, _, _, _, _) => + if (failedCnt == 0) { + launchExecutorReceived.countDown() + } + verifyMasterState(appId) failedCnt += 1 - master.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)) + master.self.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)) + case otherMsg => super.receive(otherMsg) } + + private def verifyMasterState(appId: String): Unit = { + // The app would be registered with Master once Driver set up. + // We verify the state of Master here to avoid timing issue, as it guarantees the verification + // will run before Master changes the status of app to fail. + assert(master.idToApp.contains(appId)) + } } class MasterSuite extends SparkFunSuite @@ -546,7 +560,6 @@ class MasterSuite extends SparkFunSuite PrivateMethod[Array[Int]](Symbol("scheduleExecutorsOnWorkers")) private val _drivers = PrivateMethod[HashSet[DriverInfo]](Symbol("drivers")) private val _state = PrivateMethod[RecoveryState.Value](Symbol("state")) - private val _completedApps = PrivateMethod[ArrayBuffer[ApplicationInfo]](Symbol("completedApps")) private val workerInfo = makeWorkerInfo(4096, 10) private val workerInfos = Array(workerInfo, workerInfo, workerInfo) @@ -663,7 +676,7 @@ class MasterSuite extends SparkFunSuite val master = makeAliveMaster() var worker: MockExecutorLaunchFailWorker = null try { - worker = new MockExecutorLaunchFailWorker(master.self) + worker = new MockExecutorLaunchFailWorker(master) worker.rpcEnv.setupEndpoint("worker", worker) val workerRegMsg = RegisterWorker( worker.id, @@ -678,19 +691,11 @@ class MasterSuite extends SparkFunSuite val driver = DeployTestUtils.createDriverDesc() // mimic DriverClient to send RequestSubmitDriver to master master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)) - var appId: String = null - eventually(timeout(10.seconds)) { - // an app would be registered with Master once Driver set up - assert(worker.apps.nonEmpty) - appId = worker.apps.head._1 - - // we found the case where the test was too fast which all steps were done within - // an interval - in this case, we have to check either app is available in master - // or marked as completed. See SPARK-30348 for details. - val completedApps = master.invokePrivate(_completedApps()) - assert(master.idToApp.contains(appId) || completedApps.exists(_.id == appId)) - } + // LaunchExecutor message should have been received in worker side + assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS)) + + val appId: String = worker.apps.head._1 eventually(timeout(10.seconds)) { // Master would continually launch executors until reach MAX_EXECUTOR_RETRIES assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES)) From 3ee2cc2af3b6a2c6d03206c8230f314dbf8d9f7e Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 26 Dec 2019 07:27:10 +0900 Subject: [PATCH 4/8] Fix another timing issue --- .../scala/org/apache/spark/deploy/master/MasterSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index bde34169bcdb4..38b0201094e3c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -101,6 +101,7 @@ class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkCo extends MockWorker(master.self, conf) { val launchExecutorReceived = new CountDownLatch(1) + val appIdsToLaunchExecutor = new mutable.HashSet[String] var failedCnt = 0 override def receive: PartialFunction[Any, Unit] = { case LaunchExecutor(_, appId, execId, _, _, _, _) => @@ -108,6 +109,7 @@ class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkCo launchExecutorReceived.countDown() } verifyMasterState(appId) + appIdsToLaunchExecutor += appId failedCnt += 1 master.self.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)) @@ -695,12 +697,12 @@ class MasterSuite extends SparkFunSuite // LaunchExecutor message should have been received in worker side assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS)) - val appId: String = worker.apps.head._1 eventually(timeout(10.seconds)) { + val appIds = worker.appIdsToLaunchExecutor // Master would continually launch executors until reach MAX_EXECUTOR_RETRIES assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES)) // Master would remove the app if no executor could be launched for it - assert(!master.idToApp.contains(appId)) + assert(master.idToApp.keySet.intersect(appIds).isEmpty) } } finally { if (worker != null) { From 07d78cc967ec072541f0643628c07bfe1dad6ed2 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 26 Dec 2019 08:30:52 +0900 Subject: [PATCH 5/8] Add verification on expectation for registering app --- .../spark/deploy/master/MasterSuite.scala | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 38b0201094e3c..61589d5d261a9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -97,31 +97,42 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend } } +// This class is designed to handle the lifecycle of only one application. class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkConf) - extends MockWorker(master.self, conf) { + extends MockWorker(master.self, conf) with Eventually { + val appRegistered = new CountDownLatch(1) val launchExecutorReceived = new CountDownLatch(1) val appIdsToLaunchExecutor = new mutable.HashSet[String] var failedCnt = 0 + override def receive: PartialFunction[Any, Unit] = { + case LaunchDriver(driverId, desc, resources_) => + drivers += driverId + driverResources(driverId) = resources_.map(r => (r._1, r._2.addresses.toSet)) + master.self.send(RegisterApplication(appDesc, newDriver(driverId))) + + // Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for + // handling driver related messages. It guarantees registering application is done + // before handling LaunchExecutor message. + eventually(timeout(10.seconds)) { + // an app would be registered with Master once Driver set up + assert(apps.nonEmpty) + assert(master.idToApp.keySet.intersect(apps.keySet) == apps.keySet) + } + + appRegistered.countDown() case LaunchExecutor(_, appId, execId, _, _, _, _) => if (failedCnt == 0) { launchExecutorReceived.countDown() } - verifyMasterState(appId) + assert(master.idToApp.contains(appId)) appIdsToLaunchExecutor += appId failedCnt += 1 master.self.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)) case otherMsg => super.receive(otherMsg) } - - private def verifyMasterState(appId: String): Unit = { - // The app would be registered with Master once Driver set up. - // We verify the state of Master here to avoid timing issue, as it guarantees the verification - // will run before Master changes the status of app to fail. - assert(master.idToApp.contains(appId)) - } } class MasterSuite extends SparkFunSuite @@ -694,6 +705,9 @@ class MasterSuite extends SparkFunSuite // mimic DriverClient to send RequestSubmitDriver to master master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)) + // A new application should be registered + assert(worker.appRegistered.await(10, TimeUnit.SECONDS)) + // LaunchExecutor message should have been received in worker side assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS)) From 3a0d5839c4a9b6905b23bfed5271b5653e56ba41 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 26 Dec 2019 12:07:32 +0900 Subject: [PATCH 6/8] Fix --- .../org/apache/spark/deploy/master/MasterSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 61589d5d261a9..766db154a7550 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -113,9 +113,9 @@ class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkCo master.self.send(RegisterApplication(appDesc, newDriver(driverId))) // Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for - // handling driver related messages. It guarantees registering application is done - // before handling LaunchExecutor message. - eventually(timeout(10.seconds)) { + // handling driver related messages. To simplify logic, we will block handling + // LaunchExecutor message until we validate registering app succeeds. + eventually(timeout(5.seconds)) { // an app would be registered with Master once Driver set up assert(apps.nonEmpty) assert(master.idToApp.keySet.intersect(apps.keySet) == apps.keySet) @@ -123,6 +123,8 @@ class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkCo appRegistered.countDown() case LaunchExecutor(_, appId, execId, _, _, _, _) => + assert(appRegistered.await(10, TimeUnit.SECONDS)) + if (failedCnt == 0) { launchExecutorReceived.countDown() } @@ -705,9 +707,6 @@ class MasterSuite extends SparkFunSuite // mimic DriverClient to send RequestSubmitDriver to master master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver)) - // A new application should be registered - assert(worker.appRegistered.await(10, TimeUnit.SECONDS)) - // LaunchExecutor message should have been received in worker side assert(worker.launchExecutorReceived.await(10, TimeUnit.SECONDS)) From c4775fc0b6d843a725a370095750b2ef86ae6c2f Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 26 Dec 2019 12:09:57 +0900 Subject: [PATCH 7/8] refine --- .../test/scala/org/apache/spark/deploy/master/MasterSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 766db154a7550..38eb20fd8fa1b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -108,8 +108,6 @@ class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkCo override def receive: PartialFunction[Any, Unit] = { case LaunchDriver(driverId, desc, resources_) => - drivers += driverId - driverResources(driverId) = resources_.map(r => (r._1, r._2.addresses.toSet)) master.self.send(RegisterApplication(appDesc, newDriver(driverId))) // Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for From 8b7b1a1f7dae324aeff676340567c8c319604236 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 26 Dec 2019 14:30:33 +0900 Subject: [PATCH 8/8] Address nitpick --- .../test/scala/org/apache/spark/deploy/master/MasterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 38eb20fd8fa1b..0cf573c2490b3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -107,7 +107,7 @@ class MockExecutorLaunchFailWorker(master: Master, conf: SparkConf = new SparkCo var failedCnt = 0 override def receive: PartialFunction[Any, Unit] = { - case LaunchDriver(driverId, desc, resources_) => + case LaunchDriver(driverId, _, _) => master.self.send(RegisterApplication(appDesc, newDriver(driverId))) // Below code doesn't make driver stuck, as newDriver opens another rpc endpoint for