From 8a89fe3a31058116fdd0741a1c82d84e3f0ba682 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 20 Nov 2025 16:14:58 -0800 Subject: [PATCH 1/2] Don't kill tasks while a supervisor is stopping. Previously, if a supervisor was stopped while discovering tasks or updating their status, it could end up trying to kill those tasks because the callbacks on the requests to those tasks could fail as a result of the stopping. There should be no reason for an actively-stopping supervisor to kill a task, so this patch causes the shutdown functions to be no-ops while a supervisor is stopping. This patch also ensures that when a supervisor transitions into the STOPPING state, that state takes priority over any other state. --- .../supervisor/SeekableStreamSupervisor.java | 34 ++++++++++++++++--- .../supervisor/SupervisorStateManager.java | 4 ++- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ef65d6d22caa..0f2839e7fba6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1744,7 +1744,7 @@ public void runInternal() checkIfStreamInactiveAndTurnSupervisorIdle(); // If supervisor is already stopping, don't contend for stateChangeLock since the block can be skipped - if (stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING)) { + if (isStopping()) { logDebugReport(); return; } @@ -1752,7 +1752,7 @@ public void runInternal() synchronized (stateChangeLock) { // if supervisor is not suspended, ensure required tasks are running // if suspended, ensure tasks have been requested to gracefully stop - if (stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING)) { + if (isStopping()) { // if we're already terminating, don't do anything here, the terminate already handles shutdown log.debug("Supervisor[%s] for datasource[%s] is already stopping.", supervisorId, dataSource); } else if (stateManager.isIdle()) { @@ -2024,7 +2024,16 @@ private void killTask(final String id, String reasonFormat, Object... args) { Optional taskQueue = taskMaster.getTaskQueue(); if (taskQueue.isPresent()) { - taskQueue.get().shutdown(id, reasonFormat, args); + if (isStopping()) { + log.debug( + "Not shutting down task[%s] because the supervisor[%s] has been stopped. Reason was[%s]", + id, + supervisorId, + StringUtils.format(reasonFormat, args) + ); + } else { + taskQueue.get().shutdown(id, reasonFormat, args); + } } else { log.error("Failed to get task queue because I'm not the leader!"); } @@ -2034,7 +2043,16 @@ private void killTaskWithSuccess(final String id, String reasonFormat, Object... { Optional taskQueue = taskMaster.getTaskQueue(); if (taskQueue.isPresent()) { - taskQueue.get().shutdownWithSuccess(id, reasonFormat, args); + if (isStopping()) { + log.debug( + "Not shutting down task[%s] because the supervisor[%s] has been stopped. Reason was[%s]", + id, + supervisorId, + StringUtils.format(reasonFormat, args) + ); + } else { + taskQueue.get().shutdownWithSuccess(id, reasonFormat, args); + } } else { log.error("Failed to get task queue because I'm not the leader!"); } @@ -4562,6 +4580,14 @@ private boolean checkOffsetAvailability( } } + /** + * Whether this supervisor is in a {@link SupervisorStateManager.BasicState#STOPPING} state. + */ + private boolean isStopping() + { + return stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING); + } + /** * Call {@link FutureUtils#coalesce} on the provided list, and wait for the result. */ diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java index f30517d1b225..497ad5f7c054 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java @@ -127,7 +127,9 @@ public SupervisorStateManager(SupervisorStateManagerConfig supervisorStateManage */ public synchronized void maybeSetState(State proposedState) { - if (BasicState.STOPPING.equals(this.supervisorState)) { + if (BasicState.STOPPING.equals(this.supervisorState) || BasicState.STOPPING.equals(proposedState)) { + // STOPPING takes precedence over all other states + supervisorState = proposedState; return; } From ef93c128a7ddd4516c7cef0347b740c38077f70b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 20 Nov 2025 21:50:54 -0800 Subject: [PATCH 2/2] Fix logic. --- .../indexing/overlord/supervisor/SupervisorStateManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java index 497ad5f7c054..f64f390fc566 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java @@ -129,7 +129,7 @@ public synchronized void maybeSetState(State proposedState) { if (BasicState.STOPPING.equals(this.supervisorState) || BasicState.STOPPING.equals(proposedState)) { // STOPPING takes precedence over all other states - supervisorState = proposedState; + supervisorState = BasicState.STOPPING; return; }