diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index f3f01cc5071c..f6a026e6aa79 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -71,6 +71,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; @@ -122,6 +123,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.lang.reflect.Method; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -134,6 +136,7 @@ import java.util.Map; import java.util.Properties; import java.util.TreeMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; @@ -5047,6 +5050,201 @@ public void test_doesTaskMatchSupervisor() EasyMock.replay(differentTaskType); } + @Test + public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublishing() throws Exception + { + final TaskLocation location1 = TaskLocation.create("testHost1", 1234, -1); + final TaskLocation location2 = TaskLocation.create("testHost2", 1235, -1); + final TaskLocation location3 = TaskLocation.create("testHost3", 1236, -1); + final DateTime startTime = DateTimes.nowUtc(); + + // Create supervisor with 3 task groups + supervisor = getTestableSupervisor(1, 3, true, "PT1H", null, null); + final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig(); + addSomeEvents(100); + + // Manually create 3 tasks (one for each partition/task group) + Task task1 = createKafkaIndexTask( + "task1", + DATASOURCE, + 0, + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + topic, + singlePartitionMap(topic, 0, Long.MAX_VALUE) + ), + null, + null, + tuningConfig + ); + + Task task2 = createKafkaIndexTask( + "task2", + DATASOURCE, + 1, + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 1, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + topic, + singlePartitionMap(topic, 1, Long.MAX_VALUE) + ), + null, + null, + tuningConfig + ); + + Task task3 = createKafkaIndexTask( + "task3", + DATASOURCE, + 2, + new SeekableStreamStartSequenceNumbers<>( + topic, + singlePartitionMap(topic, 2, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + topic, + singlePartitionMap(topic, 2, Long.MAX_VALUE) + ), + null, + null, + tuningConfig + ); + + Collection workItems = new ArrayList<>(); + workItems.add(new TestTaskRunnerWorkItem(task1, null, location1)); + workItems.add(new TestTaskRunnerWorkItem(task2, null, location2)); + workItems.add(new TestTaskRunnerWorkItem(task3, null, location3)); + + // Setup mocks for initial discovery of tasks + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes(); + EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(toMap(task1, task2, task3)).anyTimes(); + EasyMock.expect(taskStorage.getStatus(task1.getId())).andReturn(Optional.of(TaskStatus.running(task1.getId()))).anyTimes(); + EasyMock.expect(taskStorage.getStatus(task2.getId())).andReturn(Optional.of(TaskStatus.running(task2.getId()))).anyTimes(); + EasyMock.expect(taskStorage.getStatus(task3.getId())).andReturn(Optional.of(TaskStatus.running(task3.getId()))).anyTimes(); + EasyMock.expect(taskStorage.getTask(task1.getId())).andReturn(Optional.of(task1)).anyTimes(); + EasyMock.expect(taskStorage.getTask(task2.getId())).andReturn(Optional.of(task2)).anyTimes(); + EasyMock.expect(taskStorage.getTask(task3.getId())).andReturn(Optional.of(task3)).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)).anyTimes(); + EasyMock.expect(taskClient.getStatusAsync(task1.getId())).andReturn(Futures.immediateFuture(Status.READING)); + EasyMock.expect(taskClient.getStatusAsync(task2.getId())).andReturn(Futures.immediateFuture(Status.READING)); + EasyMock.expect(taskClient.getStatusAsync(task3.getId())).andReturn(Futures.immediateFuture(Status.READING)); + EasyMock.expect(taskClient.getStartTimeAsync(task1.getId())).andReturn(Futures.immediateFuture(startTime)); + EasyMock.expect(taskClient.getStartTimeAsync(task2.getId())).andReturn(Futures.immediateFuture(startTime)); + EasyMock.expect(taskClient.getStartTimeAsync(task3.getId())).andReturn(Futures.immediateFuture(startTime)); + + TreeMap> checkpoints1 = new TreeMap<>(); + checkpoints1.put(0, singlePartitionMap(topic, 0, 0L)); + TreeMap> checkpoints2 = new TreeMap<>(); + checkpoints2.put(0, singlePartitionMap(topic, 1, 0L)); + TreeMap> checkpoints3 = new TreeMap<>(); + checkpoints3.put(0, singlePartitionMap(topic, 2, 0L)); + + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task1.getId()), EasyMock.anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(1); + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task2.getId()), EasyMock.anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(1); + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task3.getId()), EasyMock.anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints3)) + .times(1); + + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + replayAll(); + + // Start supervisor and discover the 3 existing tasks + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + // Verify we have 3 actively reading task groups after discovery + Assert.assertEquals( + "Should have 3 actively reading task groups after discovery", + 3, + supervisor.getActivelyReadingTaskGroupsCount() + ); + + // Reset and setup mocks for gracefulShutdownInternal + EasyMock.reset(taskRunner, taskClient, taskQueue); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes(); + EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes(); + EasyMock.expect(taskClient.pauseAsync(task1.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 0, 100L))); + EasyMock.expect(taskClient.pauseAsync(task2.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 1, 100L))); + EasyMock.expect(taskClient.pauseAsync(task3.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 2, 100L))); + EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task1.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true)); + EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task2.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true)); + EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task3.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true)); + + EasyMock.replay(taskRunner, taskClient, taskQueue); + + // Simulate autoscaler scale-down by calling gracefulShutdownInternal() + // This should move tasks from activelyReadingTaskGroups to pendingCompletionTaskGroups + supervisor.gracefulShutdownInternal(); + + verifyAll(); + + // After gracefulShutdownInternal, tasks should be moved to pendingCompletionTaskGroups + Assert.assertEquals( + "activelyReadingTaskGroups should be empty after gracefulShutdownInternal", + 0, + supervisor.getActivelyReadingTaskGroupsCount() + ); + + // Verify pendingCompletionTaskGroups is NOT empty (tasks were moved there) + boolean hasPendingTasks = false; + for (int groupId = 0; groupId < 3; groupId++) { + if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) { + hasPendingTasks = true; + break; + } + } + Assert.assertTrue( + "pendingCompletionTaskGroups should contain task groups after gracefulShutdownInternal", + hasPendingTasks + ); + + // Now call clearAllocationInfo() - this is where the bug was + // The bug was that this method cleared pendingCompletionTaskGroups + supervisor.testClearAllocationInfo(); + + // THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT empty after clearAllocationInfo + // This is the fix - clearAllocationInfo should preserve pendingCompletionTaskGroups + boolean stillHasPendingTasks = false; + for (int groupId = 0; groupId < 3; groupId++) { + if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) { + stillHasPendingTasks = true; + break; + } + } + Assert.assertTrue( + "pendingCompletionTaskGroups should be preserved after clearAllocationInfo() " + + "to prevent autoscaler from creating duplicate history entries", + stillHasPendingTasks + ); + + // Verify activelyReadingTaskGroups is still empty + Assert.assertEquals( + "activelyReadingTaskGroups should remain empty after clearAllocationInfo", + 0, + supervisor.getActivelyReadingTaskGroupsCount() + ); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { // create topic manually @@ -5753,6 +5951,24 @@ private SeekableStreamSupervisorStateManager getStateManager() { return stateManager; } + + public int getActivelyReadingTaskGroupsCount() + { + return getActiveTaskGroupsCount(); + } + + public int getPendingCompletionTaskGroupsCount(int groupId) + { + CopyOnWriteArrayList groups = getPendingCompletionTaskGroups(groupId); + return groups != null ? groups.size() : 0; + } + + public void testClearAllocationInfo() throws Exception + { + Method method = SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo"); + method.setAccessible(true); + method.invoke(this); + } } private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0faa0b841346..ef65d6d22caa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -608,13 +608,19 @@ private void changeTaskCountInIOConfig(int desiredActiveTaskCount) } } + /** + * Clears allocation information including active task groups, partition groups, partition offsets, and partition IDs. + *

+ * Note: Does not clear {@link #pendingCompletionTaskGroups} so that the supervisor remembers that these + * tasks are publishing and auto-scaler does not repeatedly attempt a scale down until these tasks + * complete. If this is cleared, the next {@link #discoverTasks()} might add these tasks to + * {@link #activelyReadingTaskGroups}. + */ private void clearAllocationInfo() { activelyReadingTaskGroups.clear(); partitionGroups.clear(); partitionOffsets.clear(); - - pendingCompletionTaskGroups.clear(); partitionIds.clear(); }