From 9ccbb8e951b73c8bdad6cdad33c00a6685693b62 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 19 Feb 2026 12:23:02 +0530 Subject: [PATCH 1/5] Fix recurring bug "Inconsistency between stored metadata" during auto-scaling --- .../supervisor/SeekableStreamSupervisor.java | 29 +++++++++++++++---- .../IndexerSQLMetadataStorageCoordinator.java | 17 +++++++---- ...exerSQLMetadataStorageCoordinatorTest.java | 12 +++++--- 3 files changed, 43 insertions(+), 15 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 40049967b2ba..dae1741862c5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -585,7 +585,7 @@ private boolean changeTaskCount(int desiredActiveTaskCount) final Stopwatch scaleActionStopwatch = Stopwatch.createStarted(); gracefulShutdownInternal(); changeTaskCountInIOConfig(desiredActiveTaskCount); - clearAllocationInfo(); + clearPartitionAssignmentsForScaling(); emitter.emit(ServiceMetricEvent.builder() .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) .setDimension(DruidMetrics.DATASOURCE, dataSource) @@ -621,18 +621,29 @@ private void changeTaskCountInIOConfig(int desiredActiveTaskCount) } /** - * Clears allocation information including active task groups, partition groups, partition offsets, and partition IDs. + * Clears previous partition assignments in preparation for an upcoming scaling event. *

* Note: Does not clear {@link #pendingCompletionTaskGroups} so that the supervisor remembers that these * tasks are publishing and auto-scaler does not repeatedly attempt a scale down until these tasks * complete. If this is cleared, the next {@link #discoverTasks()} might add these tasks to * {@link #activelyReadingTaskGroups}. + *

+ * Also does not clear {@link #partitionOffsets} so that the new tasks remember + * where the previous tasks had left off. + *

+ * Since both of these are in-memory structures, a change in Overlord leadership + * might cause duplicate scaling actions and/or intermittent task failures due + * to {@code "Inconsistency between stored metadata and target"}. */ - private void clearAllocationInfo() + private void clearPartitionAssignmentsForScaling() { + // All previous tasks should now be publishing and not actively reading anymore activelyReadingTaskGroups.clear(); + + // partitionGroups will change as taskCount has changed due to scaling partitionGroups.clear(); - partitionOffsets.clear(); + + // partitionIds will be rediscovered from the stream and assigned to respective taskGroups partitionIds.clear(); } @@ -3337,6 +3348,13 @@ public Boolean apply(@Nullable DateTime startTime) } } + /** + * Checks the duration of {@link #activelyReadingTaskGroups}, requests them + * to checkpoint themselves if they have exceeded the specified run duration + * or if early stop has been requested. If checkpoint is successfull, the + * {@link #partitionOffsets} are updated and checkpointed tasks are moved to + * {@link #pendingCompletionTaskGroups}. + */ private void checkTaskDuration() throws ExecutionException, InterruptedException { final List>> futures = new ArrayList<>(); @@ -3463,7 +3481,7 @@ void maybeScaleDuringTaskRollover() changeTaskCountInIOConfig(rolloverTaskCount); // Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call. // This seems the best way to inject task amount recalculation during the rollover. - clearAllocationInfo(); + clearPartitionAssignmentsForScaling(); ServiceMetricEvent.Builder event = ServiceMetricEvent .builder() @@ -3976,6 +3994,7 @@ private void createNewTasks() throws JsonProcessingException .collect(Collectors.toSet()); } + log.info("Initializing taskGroup[%d] with startingOffsets[%s].", groupId, simpleStartingOffsets); activelyReadingTaskGroups.put( groupId, new TaskGroup( diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 4a72e01e7c46..84d4aeb78c08 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2310,7 +2310,9 @@ protected SegmentPublishResult updateDataSourceMetadataInTransaction( startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata()); } - if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) { + if (startMetadataMatchesExisting) { + // Proceed with the commit + } else if (startMetadataGreaterThanExisting) { // Offsets stored in startMetadata is greater than the last commited metadata. // This can happen because the previous task is still publishing its segments and can resolve once // the previous task finishes publishing. @@ -2319,12 +2321,15 @@ protected SegmentPublishResult updateDataSourceMetadataInTransaction( + " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb ); - } - - if (!startMetadataMatchesExisting) { - // Not in the desired start state. + } else { + // startMetadata is older than committed metadata + // The task trying to publish is probably a replica trying to commit offsets already published by another task. + // OR the metadata has been updated manually return SegmentPublishResult.fail( - "Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.", + "Stored metadata state[%s] has already been updated by other tasks and" + + " has diverged from the expected start metadata state[%s]." + + " This task will be replaced by the supervisor with a new task using updated start offsets." + + " Reset the supervisor if the issue persists.", oldCommitMetadataFromDb, startMetadata ); } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 0beecb318956..e16a67f2bcb1 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -945,8 +945,10 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() ); Assert.assertEquals( SegmentPublishResult.fail( - "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]" - + " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor." + "Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has already been updated by other tasks" + + " and has diverged from the expected start metadata state[ObjectMetadata{theObject=null}]." + + " This task will be replaced by the supervisor with a new task using updated start offsets." + + " Reset the supervisor if the issue persists." ), result2 ); @@ -1093,8 +1095,10 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() ); Assert.assertEquals( SegmentPublishResult.fail( - "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and " - + "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor." + "Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has already been updated by other tasks" + + " and has diverged from the expected start metadata state[ObjectMetadata{theObject={foo=qux}}]." + + " This task will be replaced by the supervisor with a new task using updated start offsets." + + " Reset the supervisor if the issue persists." ), result2 ); From e62823b47c668742dc4cd02006794b9bb881cc9c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 19 Feb 2026 12:51:33 +0530 Subject: [PATCH 2/5] Update test to verify new behaviour --- .../kafka/supervisor/KafkaSupervisorTest.java | 20 ++++++------------- .../supervisor/SeekableStreamSupervisor.java | 3 ++- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index f6a026e6aa79..4f6a93ce3a1e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -71,7 +71,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; -import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; @@ -123,7 +122,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.lang.reflect.Method; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -5218,12 +5216,10 @@ public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublish hasPendingTasks ); - // Now call clearAllocationInfo() - this is where the bug was - // The bug was that this method cleared pendingCompletionTaskGroups - supervisor.testClearAllocationInfo(); + // Clear the partition assignments (this is called when task count has changed) + supervisor.clearPartitionAssignmentsForScaling(); - // THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT empty after clearAllocationInfo - // This is the fix - clearAllocationInfo should preserve pendingCompletionTaskGroups + // Verify that pendingCompletionTaskGroups has not been cleared boolean stillHasPendingTasks = false; for (int groupId = 0; groupId < 3; groupId++) { if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) { @@ -5243,6 +5239,9 @@ public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublish 0, supervisor.getActivelyReadingTaskGroupsCount() ); + + // Verify that partitionOffsets have not been cleared either + Assert.assertFalse(supervisor.getPartitionOffsets().isEmpty()); } private void addSomeEvents(int numEventsPerPartition) throws Exception @@ -5962,13 +5961,6 @@ public int getPendingCompletionTaskGroupsCount(int groupId) CopyOnWriteArrayList groups = getPendingCompletionTaskGroups(groupId); return groups != null ? groups.size() : 0; } - - public void testClearAllocationInfo() throws Exception - { - Method method = SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo"); - method.setAccessible(true); - method.invoke(this); - } } private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index dae1741862c5..26252e2d5d04 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -635,7 +635,8 @@ private void changeTaskCountInIOConfig(int desiredActiveTaskCount) * might cause duplicate scaling actions and/or intermittent task failures due * to {@code "Inconsistency between stored metadata and target"}. */ - private void clearPartitionAssignmentsForScaling() + @VisibleForTesting + public void clearPartitionAssignmentsForScaling() { // All previous tasks should now be publishing and not actively reading anymore activelyReadingTaskGroups.clear(); From b74ceddb70abbd65ea11ea77625adf7beb044859 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 19 Feb 2026 17:44:10 +0530 Subject: [PATCH 3/5] Fix javadoc --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 26252e2d5d04..59f3e668225e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -632,8 +632,8 @@ private void changeTaskCountInIOConfig(int desiredActiveTaskCount) * where the previous tasks had left off. *

* Since both of these are in-memory structures, a change in Overlord leadership - * might cause duplicate scaling actions and/or intermittent task failures due - * to {@code "Inconsistency between stored metadata and target"}. + * might cause duplicate scaling actions and/or intermittent task failures if + * the new generation tasks start with stale offsets. */ @VisibleForTesting public void clearPartitionAssignmentsForScaling() From 8177f4fbefb3a415d5cbd05b51342b3b8f2a3884 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 19 Feb 2026 22:13:10 +0530 Subject: [PATCH 4/5] Fix typos, error message --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 2 +- .../druid/metadata/IndexerSQLMetadataStorageCoordinator.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 59f3e668225e..62c439e12b33 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3352,7 +3352,7 @@ public Boolean apply(@Nullable DateTime startTime) /** * Checks the duration of {@link #activelyReadingTaskGroups}, requests them * to checkpoint themselves if they have exceeded the specified run duration - * or if early stop has been requested. If checkpoint is successfull, the + * or if early stop has been requested. If checkpoint is successful, the * {@link #partitionOffsets} are updated and checkpointed tasks are moved to * {@link #pendingCompletionTaskGroups}. */ diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 84d4aeb78c08..dc0db5b56839 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2313,7 +2313,7 @@ protected SegmentPublishResult updateDataSourceMetadataInTransaction( if (startMetadataMatchesExisting) { // Proceed with the commit } else if (startMetadataGreaterThanExisting) { - // Offsets stored in startMetadata is greater than the last commited metadata. + // Offsets stored in startMetadata is greater than the last committed metadata. // This can happen because the previous task is still publishing its segments and can resolve once // the previous task finishes publishing. return SegmentPublishResult.retryableFailure( @@ -2329,7 +2329,7 @@ protected SegmentPublishResult updateDataSourceMetadataInTransaction( "Stored metadata state[%s] has already been updated by other tasks and" + " has diverged from the expected start metadata state[%s]." + " This task will be replaced by the supervisor with a new task using updated start offsets." - + " Reset the supervisor if the issue persists.", + + " Try resetting the supervisor if the issue persists.", oldCommitMetadataFromDb, startMetadata ); } From 068f44368af8641e7e567a145b5a3589caa0d9a5 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 20 Feb 2026 10:19:59 +0530 Subject: [PATCH 5/5] Fix test --- .../metadata/IndexerSQLMetadataStorageCoordinatorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index e16a67f2bcb1..9add89949c83 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -948,7 +948,7 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() "Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has already been updated by other tasks" + " and has diverged from the expected start metadata state[ObjectMetadata{theObject=null}]." + " This task will be replaced by the supervisor with a new task using updated start offsets." - + " Reset the supervisor if the issue persists." + + " Try resetting the supervisor if the issue persists." ), result2 ); @@ -1098,7 +1098,7 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() "Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has already been updated by other tasks" + " and has diverged from the expected start metadata state[ObjectMetadata{theObject={foo=qux}}]." + " This task will be replaced by the supervisor with a new task using updated start offsets." - + " Reset the supervisor if the issue persists." + + " Try resetting the supervisor if the issue persists." ), result2 );