From c477ec4e03add751a33ea2d0e7a6fa9b49f9eedd Mon Sep 17 00:00:00 2001 From: Sumeet Gajjar Date: Mon, 5 Apr 2021 17:32:43 -0500 Subject: [PATCH] [SPARK-34949][CORE] Prevent BlockManager reregister when Executor is shutting down This PR prevents reregistering BlockManager when a Executor is shutting down. It is achieved by checking `executorShutdown` before calling `env.blockManager.reregister()`. This change is required since Spark reports executors as active, even they are removed. I was testing Dynamic Allocation on K8s with about 300 executors. While doing so, when the executors were torn down due to `spark.dynamicAllocation.executorIdleTimeout`, I noticed all the executor pods being removed from K8s, however, under the "Executors" tab in SparkUI, I could see some executors listed as alive. [spark.sparkContext.statusTracker.getExecutorInfos.length](https://github.com/apache/spark/blob/65da9287bc5112564836a555cd2967fc6b05856f/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala#L105) also returned a value greater than 1. No Added a new test. Following are the logs of the executor(Id:303) which re-registers `BlockManager` ``` 21/04/02 21:33:28 INFO CoarseGrainedExecutorBackend: Got assigned task 1076 21/04/02 21:33:28 INFO Executor: Running task 4.0 in stage 3.0 (TID 1076) 21/04/02 21:33:28 INFO MapOutputTrackerWorker: Updating epoch to 302 and clearing cache 21/04/02 21:33:28 INFO TorrentBroadcast: Started reading broadcast variable 3 21/04/02 21:33:28 INFO TransportClientFactory: Successfully created connection to /100.100.195.227:33703 after 76 ms (62 ms spent in bootstraps) 21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.4 KB, free 168.0 MB) 21/04/02 21:33:28 INFO TorrentBroadcast: Reading broadcast variable 3 took 168 ms 21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 168.0 MB) 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTrackerda-lite-test-4-7a57e478947d206d-driver-svc.dex-app-n5ttnbmg.svc:7078) 21/04/02 21:33:29 INFO MapOutputTrackerWorker: Got the output locations 21/04/02 21:33:29 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 1 local blocks and 1 remote blocks 21/04/02 21:33:30 INFO TransportClientFactory: Successfully created connection to /100.100.80.103:40971 after 660 ms (528 ms spent in bootstraps) 21/04/02 21:33:30 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 1042 ms 21/04/02 21:33:31 INFO Executor: Finished task 4.0 in stage 3.0 (TID 1076). 1276 bytes result sent to driver . . . 21/04/02 21:34:16 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 21/04/02 21:34:16 INFO Executor: Told to re-register on heartbeat 21/04/02 21:34:16 INFO BlockManager: BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) re-registering with master 21/04/02 21:34:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) 21/04/02 21:34:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) 21/04/02 21:34:16 INFO BlockManager: Reporting 0 blocks to the master. 21/04/02 21:34:16 INFO MemoryStore: MemoryStore cleared 21/04/02 21:34:16 INFO BlockManager: BlockManager stopped 21/04/02 21:34:16 INFO FileDataSink: Closing sink with output file = /tmp/safari-events/.des_analysis/safari-events/hdp_spark_monitoring_random-container-037caf27-6c77-433f-820f-03cd9c7d9b6e-spark-8a492407d60b401bbf4309a14ea02ca2_events.tsv 21/04/02 21:34:16 INFO HonestProfilerBasedThreadSnapshotProvider: Stopping agent 21/04/02 21:34:16 INFO HonestProfilerHandler: Stopping honest profiler agent 21/04/02 21:34:17 INFO ShutdownHookManager: Shutdown hook called 21/04/02 21:34:17 INFO ShutdownHookManager: Deleting directory /var/data/spark-d886588c-2a7e-491d-bbcb-4f58b3e31001/spark-4aa337a0-60c0-45da-9562-8c50eaff3cea ``` Closes #32043 from sumeetgajjar/SPARK-34949. Authored-by: Sumeet Gajjar Signed-off-by: Mridul Muralidharan gmail.com> --- .../org/apache/spark/executor/Executor.scala | 2 +- .../apache/spark/executor/ExecutorSuite.scala | 68 ++++++++++++++----- 2 files changed, 53 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 085f32b8d83b4..2c7f946f33ef1 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -929,7 +929,7 @@ private[spark] class Executor( try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key)) - if (response.reregisterBlockManager) { + if (!executorShutdown.get && response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() } diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 6b3df6d0c9970..665f553d29819 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -270,6 +270,17 @@ class ExecutorSuite extends SparkFunSuite heartbeatZeroAccumulatorUpdateTest(false) } + private def withMockHeartbeatReceiverRef(executor: Executor) + (func: RpcEndpointRef => Unit): Unit = { + val executorClass = classOf[Executor] + val mockReceiverRef = mock[RpcEndpointRef] + val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") + receiverRef.setAccessible(true) + receiverRef.set(executor, mockReceiverRef) + + func(mockReceiverRef) + } + private def withHeartbeatExecutor(confs: (String, String)*) (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { val conf = new SparkConf @@ -277,22 +288,18 @@ class ExecutorSuite extends SparkFunSuite val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) withExecutor("id", "localhost", SparkEnv.get) { executor => - val executorClass = classOf[Executor] - - // Save all heartbeats sent into an ArrayBuffer for verification - val heartbeats = ArrayBuffer[Heartbeat]() - val mockReceiver = mock[RpcEndpointRef] - when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any)) - .thenAnswer((invocation: InvocationOnMock) => { - val args = invocation.getArguments() - heartbeats += args(0).asInstanceOf[Heartbeat] - HeartbeatResponse(false) - }) - val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") - receiverRef.setAccessible(true) - receiverRef.set(executor, mockReceiver) - - f(executor, heartbeats) + withMockHeartbeatReceiverRef(executor) { mockReceiverRef => + // Save all heartbeats sent into an ArrayBuffer for verification + val heartbeats = ArrayBuffer[Heartbeat]() + when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any)) + .thenAnswer((invocation: InvocationOnMock) => { + val args = invocation.getArguments() + heartbeats += args(0).asInstanceOf[Heartbeat] + HeartbeatResponse(false) + }) + + f(executor, heartbeats) + } } } @@ -416,6 +423,35 @@ class ExecutorSuite extends SparkFunSuite assert(taskMetrics.getMetricValue("JVMHeapMemory") > 0) } + test("SPARK-34949: do not re-register BlockManager when executor is shutting down") { + val reregisterInvoked = new AtomicBoolean(false) + val mockBlockManager = mock[BlockManager] + when(mockBlockManager.reregister()).thenAnswer { (_: InvocationOnMock) => + reregisterInvoked.getAndSet(true) + } + val conf = new SparkConf(false).setAppName("test").setMaster("local[2]") + val mockEnv = createMockEnv(conf, new JavaSerializer(conf)) + when(mockEnv.blockManager).thenReturn(mockBlockManager) + + withExecutor("id", "localhost", mockEnv) { executor => + withMockHeartbeatReceiverRef(executor) { mockReceiverRef => + when(mockReceiverRef.askSync(any[Heartbeat], any[RpcTimeout])(any)).thenAnswer { + (_: InvocationOnMock) => HeartbeatResponse(reregisterBlockManager = true) + } + val reportHeartbeat = PrivateMethod[Unit](Symbol("reportHeartBeat")) + executor.invokePrivate(reportHeartbeat()) + assert(reregisterInvoked.get(), "BlockManager.reregister should be invoked " + + "on HeartbeatResponse(reregisterBlockManager = true) when executor is not shutting down") + + reregisterInvoked.getAndSet(false) + executor.stop() + executor.invokePrivate(reportHeartbeat()) + assert(!reregisterInvoked.get(), + "BlockManager.reregister should not be invoked when executor is shutting down") + } + } + } + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { val mockEnv = mock[SparkEnv] val mockRpcEnv = mock[RpcEnv]