From 3a87f9c5659d5c532ce4988da01cc499dca2b79f Mon Sep 17 00:00:00 2001 From: Andreas Maechler Date: Tue, 4 Nov 2025 09:40:23 -0700 Subject: [PATCH 1/4] Do not clear pendingCompletionTaskGroups in clearAllocationInfo --- .../kafka/supervisor/KafkaSupervisorTest.java | 231 ++++++++++++++++++ .../supervisor/SeekableStreamSupervisor.java | 10 +- 2 files changed, 240 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index f3f01cc5071c..5d8df8c55478 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -71,6 +71,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; @@ -122,6 +123,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.lang.reflect.Method; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -134,6 +136,7 @@ import java.util.Map; import java.util.Properties; import java.util.TreeMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; @@ -5047,6 +5050,216 @@ public void test_doesTaskMatchSupervisor() EasyMock.replay(differentTaskType); } + /** + * Test that clearAllocationInfo() preserves pendingCompletionTaskGroups to prevent + * duplicate history entries during autoscaler scale-down operations. + *

+ * Bug: When autoscaler scales down, it calls gracefulShutdownInternal() which moves tasks + * to pendingCompletionTaskGroups, then calls clearAllocationInfo(). If clearAllocationInfo() + * clears pendingCompletionTaskGroups, the supervisor "forgets" about publishing tasks. + * During the next discoverTasks(), these tasks get rediscovered and re-added to + * activelyReadingTaskGroups, causing the autoscaler to repeatedly attempt scale-down + * and create duplicate history entries. + *

+ * Fix: clearAllocationInfo() must preserve pendingCompletionTaskGroups so the autoscaler's + * built-in check (DynamicAllocationTasksNotice.handle()) can skip scale actions while + * tasks are completing. + */ + @Test + public void test_clearAllocationInfo_preservesPendingCompletionTaskGroups() throws Exception + { + final TaskLocation location1 = TaskLocation.create("testHost1", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 1235, -1); + final TaskLocation location3 = TaskLocation.create("testHost3", 1236, -1); + final DateTime startTime = DateTimes.nowUtc(); + + // Create supervisor with 3 task groups + supervisor = getTestableSupervisor(1, 3, true, "PT1H", null, null); + final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig(); + addSomeEvents(100); + + // Manually create 3 tasks (one for each partition/task group) + Task task1 = createKafkaIndexTask( + "task1", + DATASOURCE, + 0, + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, Long.MAX_VALUE) + ), + null, + null, + tuningConfig + ); + + Task task2 = createKafkaIndexTask( + "task2", + DATASOURCE, + 1, + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 1, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + topic, + singlePartitionMap(topic, 1, Long.MAX_VALUE) + ), + null, + null, + tuningConfig + ); + + Task task3 = createKafkaIndexTask( + "task3", + DATASOURCE, + 2, + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 2, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + topic, + singlePartitionMap(topic, 2, Long.MAX_VALUE) + ), + null, + null, + tuningConfig + ); + + Collection workItems = new ArrayList<>(); + workItems.add(new TestTaskRunnerWorkItem(task1, null, location1)); + workItems.add(new TestTaskRunnerWorkItem(task2, null, location2)); + workItems.add(new TestTaskRunnerWorkItem(task3, null, location3)); + + // Setup mocks for initial discovery of tasks + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes(); + EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(toMap(task1, task2, task3)).anyTimes(); + EasyMock.expect(taskStorage.getStatus(task1.getId())).andReturn(Optional.of(TaskStatus.running(task1.getId()))).anyTimes(); + EasyMock.expect(taskStorage.getStatus(task2.getId())).andReturn(Optional.of(TaskStatus.running(task2.getId()))).anyTimes(); + EasyMock.expect(taskStorage.getStatus(task3.getId())).andReturn(Optional.of(TaskStatus.running(task3.getId()))).anyTimes(); + EasyMock.expect(taskStorage.getTask(task1.getId())).andReturn(Optional.of(task1)).anyTimes(); + EasyMock.expect(taskStorage.getTask(task2.getId())).andReturn(Optional.of(task2)).anyTimes(); + EasyMock.expect(taskStorage.getTask(task3.getId())).andReturn(Optional.of(task3)).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)).anyTimes(); + EasyMock.expect(taskClient.getStatusAsync(task1.getId())).andReturn(Futures.immediateFuture(Status.READING)); + EasyMock.expect(taskClient.getStatusAsync(task2.getId())).andReturn(Futures.immediateFuture(Status.READING)); + EasyMock.expect(taskClient.getStatusAsync(task3.getId())).andReturn(Futures.immediateFuture(Status.READING)); + EasyMock.expect(taskClient.getStartTimeAsync(task1.getId())).andReturn(Futures.immediateFuture(startTime)); + EasyMock.expect(taskClient.getStartTimeAsync(task2.getId())).andReturn(Futures.immediateFuture(startTime)); + EasyMock.expect(taskClient.getStartTimeAsync(task3.getId())).andReturn(Futures.immediateFuture(startTime)); + + TreeMap> checkpoints1 = new TreeMap<>(); + checkpoints1.put(0, singlePartitionMap(topic, 0, 0L)); + TreeMap> checkpoints2 = new TreeMap<>(); + checkpoints2.put(0, singlePartitionMap(topic, 1, 0L)); + TreeMap> checkpoints3 = new TreeMap<>(); + checkpoints3.put(0, singlePartitionMap(topic, 2, 0L)); + + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task1.getId()), EasyMock.anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(1); + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task2.getId()), EasyMock.anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(1); + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task3.getId()), EasyMock.anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints3)) + .times(1); + + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + replayAll(); + + // Start supervisor and discover the 3 existing tasks + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + // Verify we have 3 actively reading task groups after discovery + Assert.assertEquals( + "Should have 3 actively reading task groups after discovery", + 3, + supervisor.getActivelyReadingTaskGroupsCount() + ); + + // Reset and setup mocks for gracefulShutdownInternal + EasyMock.reset(taskRunner, taskClient, taskQueue); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes(); + EasyMock.expect(taskClient.pauseAsync(task1.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 0, 100L))); + EasyMock.expect(taskClient.pauseAsync(task2.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 1, 100L))); + EasyMock.expect(taskClient.pauseAsync(task3.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 2, 100L))); + EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task1.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true)); + EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task2.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true)); + EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task3.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true)); + + EasyMock.replay(taskRunner, taskClient, taskQueue); + + // Simulate autoscaler scale-down by calling gracefulShutdownInternal() + // This should move tasks from activelyReadingTaskGroups to pendingCompletionTaskGroups + supervisor.gracefulShutdownInternal(); + + verifyAll(); + + // After gracefulShutdownInternal, tasks should be moved to pendingCompletionTaskGroups + Assert.assertEquals( + "activelyReadingTaskGroups should be empty after gracefulShutdownInternal", + 0, + supervisor.getActivelyReadingTaskGroupsCount() + ); + + // Verify pendingCompletionTaskGroups is NOT empty (tasks were moved there) + boolean hasPendingTasks = false; + for (int groupId = 0; groupId < 3; groupId++) { + if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) { + hasPendingTasks = true; + break; + } + } + Assert.assertTrue( + "pendingCompletionTaskGroups should contain task groups after gracefulShutdownInternal", + hasPendingTasks + ); + + // Now call clearAllocationInfo() - this is where the bug was + // The bug was that this method cleared pendingCompletionTaskGroups + supervisor.testClearAllocationInfo(); + + // THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT empty after clearAllocationInfo + // This is the fix - clearAllocationInfo should preserve pendingCompletionTaskGroups + boolean stillHasPendingTasks = false; + for (int groupId = 0; groupId < 3; groupId++) { + if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) { + stillHasPendingTasks = true; + break; + } + } + Assert.assertTrue( + "pendingCompletionTaskGroups should be preserved after clearAllocationInfo() " + + "to prevent autoscaler from creating duplicate history entries", + stillHasPendingTasks + ); + + // Verify activelyReadingTaskGroups is still empty + Assert.assertEquals( + "activelyReadingTaskGroups should remain empty after clearAllocationInfo", + 0, + supervisor.getActivelyReadingTaskGroupsCount() + ); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { // create topic manually @@ -5753,6 +5966,24 @@ private SeekableStreamSupervisorStateManager getStateManager() { return stateManager; } + + public int getActivelyReadingTaskGroupsCount() + { + return getActiveTaskGroupsCount(); + } + + public int getPendingCompletionTaskGroupsCount(int groupId) + { + CopyOnWriteArrayList groups = getPendingCompletionTaskGroups(groupId); + return groups != null ? groups.size() : 0; + } + + public void testClearAllocationInfo() throws Exception + { + Method method = SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo"); + method.setAccessible(true); + method.invoke(this); + } } private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0faa0b841346..9e5fde67583f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -614,7 +614,15 @@ private void clearAllocationInfo() partitionGroups.clear(); partitionOffsets.clear(); - pendingCompletionTaskGroups.clear(); + // Note: We intentionally do NOT clear pendingCompletionTaskGroups here. + // When the autoscaler calls this method after gracefulShutdownInternal(), tasks have been moved to + // pendingCompletionTaskGroups and are transitioning from READING -> PUBLISHING state. + // If we clear pendingCompletionTaskGroups, the supervisor "forgets" about these publishing tasks. + // Then, during the next discoverTasks(), if tasks haven't transitioned yet, they get re-added to + // activelyReadingTaskGroups, causing the autoscaler to repeatedly attempt scale-down and create + // duplicate supervisor history entries. By preserving pendingCompletionTaskGroups, the autoscaler's + // check at DynamicAllocationTasksNotice.handle() will correctly skip scale actions until tasks complete. + partitionIds.clear(); } From aec7de9aa7dc8285b0579c6eb0a95aa028097f37 Mon Sep 17 00:00:00 2001 From: Andreas Maechler Date: Thu, 6 Nov 2025 15:28:16 -0700 Subject: [PATCH 2/4] Better descriptions --- .../kafka/supervisor/KafkaSupervisorTest.java | 19 ++----------------- .../supervisor/SeekableStreamSupervisor.java | 15 ++++++++------- 2 files changed, 10 insertions(+), 24 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 5d8df8c55478..a53ad883e1e0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -5049,24 +5049,9 @@ public void test_doesTaskMatchSupervisor() EasyMock.expect(differentTaskType.getSupervisorId()).andReturn("supervisorId"); EasyMock.replay(differentTaskType); } - - /** - * Test that clearAllocationInfo() preserves pendingCompletionTaskGroups to prevent - * duplicate history entries during autoscaler scale-down operations. - *

- * Bug: When autoscaler scales down, it calls gracefulShutdownInternal() which moves tasks - * to pendingCompletionTaskGroups, then calls clearAllocationInfo(). If clearAllocationInfo() - * clears pendingCompletionTaskGroups, the supervisor "forgets" about publishing tasks. - * During the next discoverTasks(), these tasks get rediscovered and re-added to - * activelyReadingTaskGroups, causing the autoscaler to repeatedly attempt scale-down - * and create duplicate history entries. - *

- * Fix: clearAllocationInfo() must preserve pendingCompletionTaskGroups so the autoscaler's - * built-in check (DynamicAllocationTasksNotice.handle()) can skip scale actions while - * tasks are completing. - */ + @Test - public void test_clearAllocationInfo_preservesPendingCompletionTaskGroups() throws Exception + public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublishing() throws Exception { final TaskLocation location1 = TaskLocation.create("testHost1", 1234, -1); final TaskLocation location2 = TaskLocation.create("testHost2", 1235, -1); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 9e5fde67583f..5ef939cc8e0c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -608,6 +608,14 @@ private void changeTaskCountInIOConfig(int desiredActiveTaskCount) } } + /** + * Clears allocation information including active task groups, partition groups, partition offsets, and partition IDs. + *

+ * Note: Does not clear {@link #pendingCompletionTaskGroups} so that the supervisor remembers that these + * tasks are publishing and auto-scaler does not repeatedly attempt a scale down until these tasks + * complete. If this is cleared, the next {@link #discoverTasks()} might add these tasks to + * {@link #activelyReadingTaskGroups}. + */ private void clearAllocationInfo() { activelyReadingTaskGroups.clear(); @@ -615,13 +623,6 @@ private void clearAllocationInfo() partitionOffsets.clear(); // Note: We intentionally do NOT clear pendingCompletionTaskGroups here. - // When the autoscaler calls this method after gracefulShutdownInternal(), tasks have been moved to - // pendingCompletionTaskGroups and are transitioning from READING -> PUBLISHING state. - // If we clear pendingCompletionTaskGroups, the supervisor "forgets" about these publishing tasks. - // Then, during the next discoverTasks(), if tasks haven't transitioned yet, they get re-added to - // activelyReadingTaskGroups, causing the autoscaler to repeatedly attempt scale-down and create - // duplicate supervisor history entries. By preserving pendingCompletionTaskGroups, the autoscaler's - // check at DynamicAllocationTasksNotice.handle() will correctly skip scale actions until tasks complete. partitionIds.clear(); } From a4d06dcda1b4775d88b01377c495fe3a5175f6e3 Mon Sep 17 00:00:00 2001 From: Andreas Maechler Date: Fri, 7 Nov 2025 04:13:33 -0700 Subject: [PATCH 3/4] Whitespace --- .../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index a53ad883e1e0..f6a026e6aa79 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -5049,7 +5049,7 @@ public void test_doesTaskMatchSupervisor() EasyMock.expect(differentTaskType.getSupervisorId()).andReturn("supervisorId"); EasyMock.replay(differentTaskType); } - + @Test public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublishing() throws Exception { From 88c11aef186d761c241f85757f5e753aa5087761 Mon Sep 17 00:00:00 2001 From: Andreas Maechler Date: Fri, 7 Nov 2025 08:48:16 -0700 Subject: [PATCH 4/4] Remove comment --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 5ef939cc8e0c..ef65d6d22caa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -621,9 +621,6 @@ private void clearAllocationInfo() activelyReadingTaskGroups.clear(); partitionGroups.clear(); partitionOffsets.clear(); - - // Note: We intentionally do NOT clear pendingCompletionTaskGroups here. - partitionIds.clear(); }