diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index ac0ffaf0291ca..34b70dbb1efd2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -147,18 +147,20 @@ private[spark] object Config extends Logging { .createWithDefault(0) object ExecutorRollPolicy extends Enumeration { - val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION = Value + val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, FAILED_TASKS = Value } val EXECUTOR_ROLL_POLICY = ConfigBuilder("spark.kubernetes.executor.rollPolicy") .doc("Executor roll policy: Valid values are ID, ADD_TIME, TOTAL_GC_TIME (default), " + - "and TOTAL_DURATION. When executor roll happens, Spark uses this policy to choose " + - "an executor and decomission it. The built-in policies are based on executor summary." + + "TOTAL_DURATION, and FAILED_TASKS. " + + "When executor roll happens, Spark uses this policy to choose " + + "an executor and decommission it. The built-in policies are based on executor summary." + "ID policy chooses an executor with the smallest executor ID. " + "ADD_TIME policy chooses an executor with the smallest add-time. " + "TOTAL_GC_TIME policy chooses an executor with the biggest total task GC time. " + - "TOTAL_DURATION policy chooses an executor with the biggest total task time. ") + "TOTAL_DURATION policy chooses an executor with the biggest total task time. " + + "FAILED_TASKS policy chooses an executor with the most number of failed tasks.") .version("3.3.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala index 0149845c54624..bd7297755258f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala @@ -107,6 +107,8 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging { listWithoutDriver.sortBy(_.totalGCTime).reverse case ExecutorRollPolicy.TOTAL_DURATION => listWithoutDriver.sortBy(_.totalDuration).reverse + case ExecutorRollPolicy.FAILED_TASKS => + listWithoutDriver.sortBy(_.failedTasks).reverse } sortedList.headOption.map(_.id) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala index dd54641446e41..239456a902e24 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala @@ -74,8 +74,17 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester { Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1, false, Set()) + // The biggest failedTasks + val execWithBiggestFailedTasks = new ExecutorSummary("5", "host:port", true, 1, + 10, 10, 1, 1, 1, + 5, 0, 1, 100, + 1, 100, 100, + 10, false, 20, new Date(1639300003000L), + Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1, + false, Set()) + val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime, - execWithBiggestTotalGCTime, execWithBiggestTotalDuration) + execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks) test("Empty executor list") { ExecutorRollPolicy.values.foreach { value => @@ -112,4 +121,8 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester { test("Policy: TOTAL_DURATION") { assertEquals(Some("4"), plugin.invokePrivate(_choose(list, ExecutorRollPolicy.TOTAL_DURATION))) } + + test("Policy: FAILED_TASKS") { + assertEquals(Some("5"), plugin.invokePrivate(_choose(list, ExecutorRollPolicy.FAILED_TASKS))) + } }