Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
Expand Down Expand Up @@ -122,6 +123,7 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
Expand All @@ -134,6 +136,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
Expand Down Expand Up @@ -5047,6 +5050,201 @@ public void test_doesTaskMatchSupervisor()
EasyMock.replay(differentTaskType);
}

@Test
public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublishing() throws Exception
{
final TaskLocation location1 = TaskLocation.create("testHost1", 1234, -1);
final TaskLocation location2 = TaskLocation.create("testHost2", 1235, -1);
final TaskLocation location3 = TaskLocation.create("testHost3", 1236, -1);
final DateTime startTime = DateTimes.nowUtc();

// Create supervisor with 3 task groups
supervisor = getTestableSupervisor(1, 3, true, "PT1H", null, null);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(100);

// Manually create 3 tasks (one for each partition/task group)
Task task1 = createKafkaIndexTask(
"task1",
DATASOURCE,
0,
new SeekableStreamStartSequenceNumbers<>(
topic,
singlePartitionMap(topic, 0, 0L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
topic,
singlePartitionMap(topic, 0, Long.MAX_VALUE)
),
null,
null,
tuningConfig
);

Task task2 = createKafkaIndexTask(
"task2",
DATASOURCE,
1,
new SeekableStreamStartSequenceNumbers<>(
topic,
singlePartitionMap(topic, 1, 0L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
topic,
singlePartitionMap(topic, 1, Long.MAX_VALUE)
),
null,
null,
tuningConfig
);

Task task3 = createKafkaIndexTask(
"task3",
DATASOURCE,
2,
new SeekableStreamStartSequenceNumbers<>(
topic,
singlePartitionMap(topic, 2, 0L),
ImmutableSet.of()
),
new SeekableStreamEndSequenceNumbers<>(
topic,
singlePartitionMap(topic, 2, Long.MAX_VALUE)
),
null,
null,
tuningConfig
);

Collection workItems = new ArrayList<>();
workItems.add(new TestTaskRunnerWorkItem(task1, null, location1));
workItems.add(new TestTaskRunnerWorkItem(task2, null, location2));
workItems.add(new TestTaskRunnerWorkItem(task3, null, location3));

// Setup mocks for initial discovery of tasks
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes();
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(toMap(task1, task2, task3)).anyTimes();
EasyMock.expect(taskStorage.getStatus(task1.getId())).andReturn(Optional.of(TaskStatus.running(task1.getId()))).anyTimes();
EasyMock.expect(taskStorage.getStatus(task2.getId())).andReturn(Optional.of(TaskStatus.running(task2.getId()))).anyTimes();
EasyMock.expect(taskStorage.getStatus(task3.getId())).andReturn(Optional.of(TaskStatus.running(task3.getId()))).anyTimes();
EasyMock.expect(taskStorage.getTask(task1.getId())).andReturn(Optional.of(task1)).anyTimes();
EasyMock.expect(taskStorage.getTask(task2.getId())).andReturn(Optional.of(task2)).anyTimes();
EasyMock.expect(taskStorage.getTask(task3.getId())).andReturn(Optional.of(task3)).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null)).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(task1.getId())).andReturn(Futures.immediateFuture(Status.READING));
EasyMock.expect(taskClient.getStatusAsync(task2.getId())).andReturn(Futures.immediateFuture(Status.READING));
EasyMock.expect(taskClient.getStatusAsync(task3.getId())).andReturn(Futures.immediateFuture(Status.READING));
EasyMock.expect(taskClient.getStartTimeAsync(task1.getId())).andReturn(Futures.immediateFuture(startTime));
EasyMock.expect(taskClient.getStartTimeAsync(task2.getId())).andReturn(Futures.immediateFuture(startTime));
EasyMock.expect(taskClient.getStartTimeAsync(task3.getId())).andReturn(Futures.immediateFuture(startTime));

TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, singlePartitionMap(topic, 0, 0L));
TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, singlePartitionMap(topic, 1, 0L));
TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints3 = new TreeMap<>();
checkpoints3.put(0, singlePartitionMap(topic, 2, 0L));

EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task1.getId()), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints1))
.times(1);
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task2.getId()), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints2))
.times(1);
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task3.getId()), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints3))
.times(1);

taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
replayAll();

// Start supervisor and discover the 3 existing tasks
supervisor.start();
supervisor.runInternal();
verifyAll();

// Verify we have 3 actively reading task groups after discovery
Assert.assertEquals(
"Should have 3 actively reading task groups after discovery",
3,
supervisor.getActivelyReadingTaskGroupsCount()
);

// Reset and setup mocks for gracefulShutdownInternal
EasyMock.reset(taskRunner, taskClient, taskQueue);
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes();
EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes();
EasyMock.expect(taskClient.pauseAsync(task1.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 0, 100L)));
EasyMock.expect(taskClient.pauseAsync(task2.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 1, 100L)));
EasyMock.expect(taskClient.pauseAsync(task3.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic, 2, 100L)));
EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task1.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task2.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task3.getId()), EasyMock.anyObject(), EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));

EasyMock.replay(taskRunner, taskClient, taskQueue);

// Simulate autoscaler scale-down by calling gracefulShutdownInternal()
// This should move tasks from activelyReadingTaskGroups to pendingCompletionTaskGroups
supervisor.gracefulShutdownInternal();

verifyAll();

// After gracefulShutdownInternal, tasks should be moved to pendingCompletionTaskGroups
Assert.assertEquals(
"activelyReadingTaskGroups should be empty after gracefulShutdownInternal",
0,
supervisor.getActivelyReadingTaskGroupsCount()
);

// Verify pendingCompletionTaskGroups is NOT empty (tasks were moved there)
boolean hasPendingTasks = false;
for (int groupId = 0; groupId < 3; groupId++) {
if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
hasPendingTasks = true;
break;
}
}
Assert.assertTrue(
"pendingCompletionTaskGroups should contain task groups after gracefulShutdownInternal",
hasPendingTasks
);

// Now call clearAllocationInfo() - this is where the bug was
// The bug was that this method cleared pendingCompletionTaskGroups
supervisor.testClearAllocationInfo();

// THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT empty after clearAllocationInfo
// This is the fix - clearAllocationInfo should preserve pendingCompletionTaskGroups
boolean stillHasPendingTasks = false;
for (int groupId = 0; groupId < 3; groupId++) {
if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
stillHasPendingTasks = true;
break;
}
}
Assert.assertTrue(
"pendingCompletionTaskGroups should be preserved after clearAllocationInfo() " +
"to prevent autoscaler from creating duplicate history entries",
stillHasPendingTasks
);

// Verify activelyReadingTaskGroups is still empty
Assert.assertEquals(
"activelyReadingTaskGroups should remain empty after clearAllocationInfo",
0,
supervisor.getActivelyReadingTaskGroupsCount()
);
}

private void addSomeEvents(int numEventsPerPartition) throws Exception
{
// create topic manually
Expand Down Expand Up @@ -5753,6 +5951,24 @@ private SeekableStreamSupervisorStateManager getStateManager()
{
return stateManager;
}

public int getActivelyReadingTaskGroupsCount()
{
return getActiveTaskGroupsCount();
}

public int getPendingCompletionTaskGroupsCount(int groupId)
{
CopyOnWriteArrayList<?> groups = getPendingCompletionTaskGroups(groupId);
return groups != null ? groups.size() : 0;
}

public void testClearAllocationInfo() throws Exception
{
Method method = SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo");
method.setAccessible(true);
method.invoke(this);
}
}

private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,13 +608,19 @@ private void changeTaskCountInIOConfig(int desiredActiveTaskCount)
}
}

/**
* Clears allocation information including active task groups, partition groups, partition offsets, and partition IDs.
* <p>
* Note: Does not clear {@link #pendingCompletionTaskGroups} so that the supervisor remembers that these
* tasks are publishing and auto-scaler does not repeatedly attempt a scale down until these tasks
* complete. If this is cleared, the next {@link #discoverTasks()} might add these tasks to
* {@link #activelyReadingTaskGroups}.
*/
private void clearAllocationInfo()
{
activelyReadingTaskGroups.clear();
partitionGroups.clear();
partitionOffsets.clear();

pendingCompletionTaskGroups.clear();
partitionIds.clear();
}

Expand Down