From e16067e7ee2f906dfed65b5418527ab1f5db691e Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 18 Apr 2019 20:49:36 +0800 Subject: [PATCH 1/3] [SPARK-27460][TESTS] Running slowest test suites in their own forked JVMs for higher parallelism This patch modifies SparkBuild so that the largest / slowest test suites (or collections of suites) can run in their own forked JVMs, allowing them to be run in parallel with each other. This opt-in / whitelisting approach allows us to increase parallelism without having to fix a long-tail of flakiness / brittleness issues in tests which aren't performance bottlenecks. See comments in SparkBuild.scala for information on the details, including a summary of why we sometimes opt to run entire groups of tests in a single forked JVM . The time of full new pull request test in Jenkins is reduced by around 53%: before changes: 4hr 40min after changes: 2hr 13min Unit test Closes #24373 from gengliangwang/parallelTest. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan --- project/SparkBuild.scala | 94 ++++++++++++++++++- .../apache/spark/sql/SQLQueryTestSuite.scala | 5 +- .../spark/sql/test/SharedSparkSession.scala | 7 +- 3 files changed, 100 insertions(+), 6 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 56b27fab765a6..7550c7253f74e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -430,6 +430,83 @@ object SparkBuild extends PomBuild { else x.settings(Seq[Setting[_]](): _*) } ++ Seq[Project](OldDeps.project) } + + if (!sys.env.contains("SERIAL_SBT_TESTS")) { + allProjects.foreach(enable(SparkParallelTestGrouping.settings)) + } +} + +object SparkParallelTestGrouping { + // Settings for parallelizing tests. The basic strategy here is to run the slowest suites (or + // collections of suites) in their own forked JVMs, allowing us to gain parallelism within a + // SBT project. Here, we take a whitelisting approach where the default behavior is to run all + // tests sequentially in a single JVM, requiring us to manually opt-in to the extra parallelism. + // + // There are a reasons why such a whitelist approach is good: + // + // 1. Launching one JVM per suite adds significant overhead for short-running suites. In + // addition to JVM startup time and JIT warmup, it appears that initialization of Derby + // metastores can be very slow so creating a fresh warehouse per suite is inefficient. + // + // 2. When parallelizing within a project we need to give each forked JVM a different tmpdir + // so that the metastore warehouses do not collide. Unfortunately, it seems that there are + // some tests which have an overly tight dependency on the default tmpdir, so those fragile + // tests need to continue re-running in the default configuration (or need to be rewritten). + // Fixing that problem would be a huge amount of work for limited payoff in most cases + // because most test suites are short-running. + // + + private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set( + "org.apache.spark.DistributedSuite", + "org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite", + "org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite", + "org.apache.spark.sql.catalyst.expressions.CastSuite", + "org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite", + "org.apache.spark.sql.hive.HiveExternalCatalogSuite", + "org.apache.spark.sql.hive.StatisticsSuite", + "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite", + "org.apache.spark.sql.hive.client.VersionsSuite", + "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite", + "org.apache.spark.ml.classification.LogisticRegressionSuite", + "org.apache.spark.ml.classification.LinearSVCSuite", + "org.apache.spark.sql.SQLQueryTestSuite" + ) + + private val DEFAULT_TEST_GROUP = "default_test_group" + + private def testNameToTestGroup(name: String): String = name match { + case _ if testsWhichShouldRunInTheirOwnDedicatedJvm.contains(name) => name + case _ => DEFAULT_TEST_GROUP + } + + lazy val settings = Seq( + testGrouping in Test := { + val tests: Seq[TestDefinition] = (definedTests in Test).value + val defaultForkOptions = ForkOptions( + bootJars = Nil, + javaHome = javaHome.value, + connectInput = connectInput.value, + outputStrategy = outputStrategy.value, + runJVMOptions = (javaOptions in Test).value, + workingDirectory = Some(baseDirectory.value), + envVars = (envVars in Test).value + ) + tests.groupBy(test => testNameToTestGroup(test.name)).map { case (groupName, groupTests) => + val forkOptions = { + if (groupName == DEFAULT_TEST_GROUP) { + defaultForkOptions + } else { + defaultForkOptions.copy(runJVMOptions = defaultForkOptions.runJVMOptions ++ + Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName")) + } + } + new Tests.Group( + name = groupName, + tests = groupTests, + runPolicy = Tests.SubProcess(forkOptions)) + } + }.toSeq + ) } object Core { @@ -844,8 +921,14 @@ object TestSettings { testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), // Enable Junit testing. libraryDependencies += "com.novocode" % "junit-interface" % "0.11" % "test", - // Only allow one test at a time, even across projects, since they run in the same JVM - parallelExecution in Test := false, + // `parallelExecutionInTest` controls whether test suites belonging to the same SBT project + // can run in parallel with one another. It does NOT control whether tests execute in parallel + // within the same JVM (which is controlled by `testForkedParallel`) or whether test cases + // within the same suite can run in parallel (which is a ScalaTest runner option which is passed + // to the underlying runner but is not a SBT-level configuration). This needs to be `true` in + // order for the extra parallelism enabled by `SparkParallelTestGrouping` to take effect. + // The `SERIAL_SBT_TESTS` check is here so the extra parallelism can be feature-flagged. + parallelExecution in Test := { if (sys.env.contains("SERIAL_SBT_TESTS")) false else true }, // Make sure the test temp directory exists. resourceGenerators in Test += Def.macroValueI(resourceManaged in Test map { outDir: File => var dir = new File(testTempDir) @@ -866,7 +949,12 @@ object TestSettings { } Seq.empty[File] }).value, - concurrentRestrictions in Global += Tags.limit(Tags.Test, 1) + concurrentRestrictions in Global := { + // The number of concurrent test groups is empirically chosen based on experience + // with Jenkins flakiness. + if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in Global).value + else Seq(Tags.limit(Tags.ForkedTestGroup, 4)) + } ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index ab817ff726dec..f3a741f10b40c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -267,9 +267,12 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { val df = session.sql(sql) val schema = df.schema val notIncludedMsg = "[not included in comparison]" + val clsName = this.getClass.getCanonicalName // Get answer, but also get rid of the #1234 expression ids that show up in explain plans val answer = df.queryExecution.hiveResultString().map(_.replaceAll("#\\d+", "#x") - .replaceAll("Location.*/sql/core/", s"Location ${notIncludedMsg}sql/core/") + .replaceAll( + s"Location.*/sql/core/spark-warehouse/$clsName/", + s"Location ${notIncludedMsg}sql/core/spark-warehouse/") .replaceAll("Created By.*", s"Created By $notIncludedMsg") .replaceAll("Created Time.*", s"Created Time $notIncludedMsg") .replaceAll("Last Access.*", s"Last Access $notIncludedMsg") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index e7e0ce64963a3..efdbc7e7ff868 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -25,7 +25,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.{DebugFilesystem, SparkConf} import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]]. @@ -36,7 +36,7 @@ trait SharedSparkSession with Eventually { self: Suite => protected def sparkConf = { - new SparkConf() + val conf = new SparkConf() .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName) .set("spark.unsafe.exceptionOnMemoryLeak", "true") .set(SQLConf.CODEGEN_FALLBACK.key, "false") @@ -45,6 +45,9 @@ trait SharedSparkSession // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) + conf.set( + StaticSQLConf.WAREHOUSE_PATH, + conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName) } /** From 0896cb548e1c1e1c48b74a1180be8e38486dee6d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 18 Apr 2019 15:37:55 -0700 Subject: [PATCH 2/3] [SPARK-27460][TESTS][FOLLOWUP] Add HiveClientVersions to parallel test suite list The test time of `HiveClientVersions` is around 3.5 minutes. This PR is to add it into the parallel test suite list. To make sure there is no colliding warehouse location, we can change the warehouse path to a temporary directory. Unit test Closes #24404 from gengliangwang/parallelTestFollowUp. Authored-by: Gengliang Wang Signed-off-by: Dongjoon Hyun --- project/SparkBuild.scala | 1 + .../org/apache/spark/sql/hive/client/HiveClientSuite.scala | 2 ++ 2 files changed, 3 insertions(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 7550c7253f74e..1edbe17fa7ef6 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -466,6 +466,7 @@ object SparkParallelTestGrouping { "org.apache.spark.sql.hive.StatisticsSuite", "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite", "org.apache.spark.sql.hive.client.VersionsSuite", + "org.apache.spark.sql.hive.client.HiveClientVersions", "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite", "org.apache.spark.ml.classification.LogisticRegressionSuite", "org.apache.spark.ml.classification.LinearSVCSuite", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index fa9f753795f65..5bdb13aec0129 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType} +import org.apache.spark.util.Utils // TODO: Refactor this to `HivePartitionFilteringSuite` class HiveClientSuite(version: String) @@ -45,6 +46,7 @@ class HiveClientSuite(version: String) val hadoopConf = new Configuration() hadoopConf.setBoolean(tryDirectSqlKey, tryDirectSql) + hadoopConf.set("hive.metastore.warehouse.dir", Utils.createTempDir().toURI().toString()) val client = buildClient(hadoopConf) client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (ds INT, h INT, chunk STRING)") From 8c1469802ecad66c5abbddcc76931b982b81000d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 24 Apr 2019 17:36:29 +0800 Subject: [PATCH 3/3] [SPARK-27460][FOLLOW-UP][TESTS] Fix flaky tests This patch makes several test flakiness fixes. N/A Closes #24434 from gatorsmile/fixFlakyTest. Lead-authored-by: gatorsmile Co-authored-by: Hyukjin Kwon Signed-off-by: Wenchen Fan --- .../ExecutorAllocationManagerSuite.scala | 18 +++++--- .../apache/spark/SparkContextInfoSuite.scala | 9 +++- .../org/apache/spark/SparkFunSuite.scala | 46 ++++++++++++++++++- .../org/apache/spark/StatusTrackerSuite.scala | 2 +- .../history/FsHistoryProviderSuite.scala | 20 ++++---- .../SparkListenerWithClusterSuite.scala | 10 ++-- .../deploy/yarn/BaseYarnClusterSuite.scala | 2 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 4 +- .../ui/SQLAppStatusListenerSuite.scala | 3 +- .../sql/streaming/FileStreamSourceSuite.scala | 2 +- .../spark/sql/streaming/StreamTest.scala | 2 +- .../StreamingQueryManagerSuite.scala | 17 ++++--- .../spark/streaming/ReceiverSuite.scala | 2 +- 13 files changed, 99 insertions(+), 38 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index a69045f119d3a..df5d2658e8c7c 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito.{mock, never, verify, when} -import org.scalatest.{BeforeAndAfter, PrivateMethodTester} +import org.scalatest.PrivateMethodTester import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config @@ -37,20 +37,24 @@ import org.apache.spark.util.ManualClock */ class ExecutorAllocationManagerSuite extends SparkFunSuite - with LocalSparkContext - with BeforeAndAfter { + with LocalSparkContext { import ExecutorAllocationManager._ import ExecutorAllocationManagerSuite._ private val contexts = new mutable.ListBuffer[SparkContext]() - before { + override def beforeEach(): Unit = { + super.beforeEach() contexts.clear() } - after { - contexts.foreach(_.stop()) + override def afterEach(): Unit = { + try { + contexts.foreach(_.stop()) + } finally { + super.afterEach() + } } private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = { @@ -281,7 +285,7 @@ class ExecutorAllocationManagerSuite assert(totalRunningTasks(manager) === 0) } - test("cancel pending executors when no longer needed") { + testRetry("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5))) diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala index 051a13c9413ef..c45f104d0aa95 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark +import scala.concurrent.duration._ + import org.scalatest.Assertions +import org.scalatest.concurrent.Eventually._ import org.apache.spark.storage.StorageLevel @@ -58,10 +61,12 @@ class SparkContextInfoSuite extends SparkFunSuite with LocalSparkContext { test("getRDDStorageInfo only reports on RDDs that actually persist data") { sc = new SparkContext("local", "test") val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() - assert(sc.getRDDStorageInfo.size === 0) + assert(sc.getRDDStorageInfo.length === 0) rdd.collect() sc.listenerBus.waitUntilEmpty(10000) - assert(sc.getRDDStorageInfo.size === 1) + eventually(timeout(10.seconds), interval(100.milliseconds)) { + assert(sc.getRDDStorageInfo.length === 1) + } assert(sc.getRDDStorageInfo.head.isCached) assert(sc.getRDDStorageInfo.head.memSize > 0) assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY) diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 31289026b0027..ffb679ff0dc57 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -20,7 +20,9 @@ package org.apache.spark // scalastyle:off import java.io.File -import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome} +import scala.annotation.tailrec + +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, Outcome} import org.apache.spark.internal.Logging import org.apache.spark.util.AccumulatorContext @@ -52,6 +54,7 @@ import org.apache.spark.util.AccumulatorContext abstract class SparkFunSuite extends FunSuite with BeforeAndAfterAll + with BeforeAndAfterEach with ThreadAudit with Logging { // scalastyle:on @@ -87,6 +90,47 @@ abstract class SparkFunSuite getTestResourceFile(file).getCanonicalPath } + /** + * Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to + * set up and tear down resources. + */ + def testRetry(s: String, n: Int = 2)(body: => Unit): Unit = { + test(s) { + retry(n) { + body + } + } + } + + /** + * Note: this method doesn't support `BeforeAndAfter`. You must use `BeforeAndAfterEach` to + * set up and tear down resources. + */ + def retry[T](n: Int)(body: => T): T = { + if (this.isInstanceOf[BeforeAndAfter]) { + throw new UnsupportedOperationException( + s"testRetry/retry cannot be used with ${classOf[BeforeAndAfter]}. " + + s"Please use ${classOf[BeforeAndAfterEach]} instead.") + } + retry0(n, n)(body) + } + + @tailrec private final def retry0[T](n: Int, n0: Int)(body: => T): T = { + try body + catch { case e: Throwable => + if (n > 0) { + logWarning(e.getMessage, e) + logInfo(s"\n\n===== RETRY #${n0 - n + 1} =====\n") + // Reset state before re-attempting in order so that tests which use patterns like + // LocalSparkContext to clean up state can work correctly when retried. + afterEach() + beforeEach() + retry0(n-1, n0)(body) + } + else throw e + } + } + /** * Log the suite name and the test name before and after each test. * diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala index a15ae040d43a9..75812ae02690d 100644 --- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.JobExecutionStatus._ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkContext { - test("basic status API usage") { + testRetry("basic status API usage") { sc = new SparkContext("local", "test", new SparkConf(false)) val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync() val jobId: Int = eventually(timeout(10 seconds)) { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 6aad00b9bf05b..dc15da5ecbecf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -34,7 +34,6 @@ import org.json4s.jackson.JsonMethods._ import org.mockito.ArgumentMatcher import org.mockito.Matchers.{any, argThat} import org.mockito.Mockito.{doThrow, mock, spy, verify, when} -import org.scalatest.BeforeAndAfter import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ @@ -48,16 +47,21 @@ import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} -class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { +class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { private var testDir: File = null - before { + override def beforeEach(): Unit = { + super.beforeEach() testDir = Utils.createTempDir(namePrefix = s"a b%20c+d") } - after { - Utils.deleteRecursively(testDir) + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(testDir) + } finally { + super.afterEach() + } } /** Create a fake log file using the new log format used in Spark 1.3+ */ @@ -487,7 +491,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1 second), interval(10 millis)) { + eventually(timeout(3.second), interval(10.milliseconds)) { provider.getConfig().keys should not contain ("HDFS State") } } finally { @@ -495,7 +499,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } } - test("provider reports error after FS leaves safe mode") { + testRetry("provider reports error after FS leaves safe mode") { testDir.delete() val clock = new ManualClock() val provider = new SafeModeTestProvider(createTestConf(), clock) @@ -505,7 +509,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc provider.inSafeMode = false clock.setTime(10000) - eventually(timeout(1 second), interval(10 millis)) { + eventually(timeout(3.second), interval(10.milliseconds)) { verify(errorHandler).uncaughtException(any(), any()) } } finally { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala index 123f7f49d21b5..a6576e0d1c520 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala @@ -19,25 +19,23 @@ package org.apache.spark.scheduler import scala.collection.mutable -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} - import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.scheduler.cluster.ExecutorInfo /** * Unit tests for SparkListener that require a local cluster. */ -class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext - with BeforeAndAfter with BeforeAndAfterAll { +class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext { /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 - before { + override def beforeEach(): Unit = { + super.beforeEach() sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite") } - test("SparkListener sends executor added message") { + testRetry("SparkListener sends executor added message") { val listener = new SaveExecutorInfo sc.addSparkListener(listener) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 3a7913122dd83..48ce1780aeb9e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -169,7 +169,7 @@ abstract class BaseYarnClusterSuite val handle = launcher.startApplication() try { - eventually(timeout(2 minutes), interval(1 second)) { + eventually(timeout(3.minutes), interval(1.second)) { assert(handle.getState().isFinal()) } } finally { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 32e3d05500326..2c34b4e3781da 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -202,7 +202,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { .startApplication() try { - eventually(timeout(30 seconds), interval(100 millis)) { + eventually(timeout(3.minutes), interval(100.milliseconds)) { handle.getState() should be (SparkAppHandle.State.RUNNING) } @@ -210,7 +210,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { handle.getAppId() should startWith ("application_") handle.stop() - eventually(timeout(30 seconds), interval(100 millis)) { + eventually(timeout(3.minutes), interval(100.milliseconds)) { handle.getState() should be (SparkAppHandle.State.KILLED) } } finally { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index c5f3fe56ead8a..0091f8b4df7bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -490,7 +490,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with } // Wait for listener to finish computing the metrics for the execution. - while (statusStore.executionsList().last.metricValues == null) { + while (statusStore.executionsList().isEmpty || + statusStore.executionsList().last.metricValues == null) { Thread.sleep(100) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fb0b3656173ee..9f6553ed4474b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -195,7 +195,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { import testImplicits._ - override val streamingTimeout = 20.seconds + override val streamingTimeout = 80.seconds /** Use `format` and `path` to create FileStreamSource via DataFrameReader */ private def createFileStreamSource( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 35644c58cf795..7bd1320eae15f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -87,7 +87,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be protected val defaultUseV2Sink = false /** How long to wait for an active stream to catch up when checking a result. */ - val streamingTimeout = 10.seconds + val streamingTimeout = 60.seconds /** A trait for actions that can be performed while testing a streaming DataFrame. */ trait StreamAction diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 46eec736d4027..d17d0354aea28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -36,21 +36,26 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.Utils -class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { +class StreamingQueryManagerSuite extends StreamTest { import AwaitTerminationTester._ import testImplicits._ override val streamingTimeout = 20.seconds - before { + override def beforeEach(): Unit = { + super.beforeEach() assert(spark.streams.active.isEmpty) spark.streams.resetTerminated() } - after { - assert(spark.streams.active.isEmpty) - spark.streams.resetTerminated() + override def afterEach(): Unit = { + try { + assert(spark.streams.active.isEmpty) + spark.streams.resetTerminated() + } finally { + super.afterEach() + } } testQuietly("listing") { @@ -84,7 +89,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } - testQuietly("awaitAnyTermination without timeout and resetTerminated") { + testRetry("awaitAnyTermination without timeout and resetTerminated") { val datasets = Seq.fill(5)(makeDataset._2) withQueriesOn(datasets: _*) { queries => require(queries.size === datasets.size) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index fc6218a33f741..33f93daac26e1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -121,7 +121,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable { } // Verify that stopping actually stops the thread - failAfter(100 millis) { + failAfter(1.second) { receiver.stop("test") assert(receiver.isStopped) assert(!receiver.otherThread.isAlive)