diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index f6a026e6aa79..4f6a93ce3a1e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -71,7 +71,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; -import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; @@ -123,7 +122,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.lang.reflect.Method; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -5218,12 +5216,10 @@ public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublish hasPendingTasks ); - // Now call clearAllocationInfo() - this is where the bug was - // The bug was that this method cleared pendingCompletionTaskGroups - supervisor.testClearAllocationInfo(); + // Clear the partition assignments (this is called when task count has changed) + supervisor.clearPartitionAssignmentsForScaling(); - // THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT empty after clearAllocationInfo - // This is the fix - clearAllocationInfo should preserve pendingCompletionTaskGroups + // Verify that pendingCompletionTaskGroups has not been cleared boolean stillHasPendingTasks = false; for (int groupId = 0; groupId < 3; groupId++) { if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) { @@ -5243,6 +5239,9 @@ public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublish 0, supervisor.getActivelyReadingTaskGroupsCount() ); + + // Verify that partitionOffsets have not been cleared either + Assert.assertFalse(supervisor.getPartitionOffsets().isEmpty()); } private void addSomeEvents(int numEventsPerPartition) throws Exception @@ -5962,13 +5961,6 @@ public int getPendingCompletionTaskGroupsCount(int groupId) CopyOnWriteArrayList groups = getPendingCompletionTaskGroups(groupId); return groups != null ? groups.size() : 0; } - - public void testClearAllocationInfo() throws Exception - { - Method method = SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo"); - method.setAccessible(true); - method.invoke(this); - } } private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 40049967b2ba..62c439e12b33 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -585,7 +585,7 @@ private boolean changeTaskCount(int desiredActiveTaskCount) final Stopwatch scaleActionStopwatch = Stopwatch.createStarted(); gracefulShutdownInternal(); changeTaskCountInIOConfig(desiredActiveTaskCount); - clearAllocationInfo(); + clearPartitionAssignmentsForScaling(); emitter.emit(ServiceMetricEvent.builder() .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) .setDimension(DruidMetrics.DATASOURCE, dataSource) @@ -621,18 +621,30 @@ private void changeTaskCountInIOConfig(int desiredActiveTaskCount) } /** - * Clears allocation information including active task groups, partition groups, partition offsets, and partition IDs. + * Clears previous partition assignments in preparation for an upcoming scaling event. *

* Note: Does not clear {@link #pendingCompletionTaskGroups} so that the supervisor remembers that these * tasks are publishing and auto-scaler does not repeatedly attempt a scale down until these tasks * complete. If this is cleared, the next {@link #discoverTasks()} might add these tasks to * {@link #activelyReadingTaskGroups}. + *

+ * Also does not clear {@link #partitionOffsets} so that the new tasks remember + * where the previous tasks had left off. + *

+ * Since both of these are in-memory structures, a change in Overlord leadership + * might cause duplicate scaling actions and/or intermittent task failures if + * the new generation tasks start with stale offsets. */ - private void clearAllocationInfo() + @VisibleForTesting + public void clearPartitionAssignmentsForScaling() { + // All previous tasks should now be publishing and not actively reading anymore activelyReadingTaskGroups.clear(); + + // partitionGroups will change as taskCount has changed due to scaling partitionGroups.clear(); - partitionOffsets.clear(); + + // partitionIds will be rediscovered from the stream and assigned to respective taskGroups partitionIds.clear(); } @@ -3337,6 +3349,13 @@ public Boolean apply(@Nullable DateTime startTime) } } + /** + * Checks the duration of {@link #activelyReadingTaskGroups}, requests them + * to checkpoint themselves if they have exceeded the specified run duration + * or if early stop has been requested. If checkpoint is successful, the + * {@link #partitionOffsets} are updated and checkpointed tasks are moved to + * {@link #pendingCompletionTaskGroups}. + */ private void checkTaskDuration() throws ExecutionException, InterruptedException { final List>> futures = new ArrayList<>(); @@ -3463,7 +3482,7 @@ void maybeScaleDuringTaskRollover() changeTaskCountInIOConfig(rolloverTaskCount); // Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call. // This seems the best way to inject task amount recalculation during the rollover. - clearAllocationInfo(); + clearPartitionAssignmentsForScaling(); ServiceMetricEvent.Builder event = ServiceMetricEvent .builder() @@ -3976,6 +3995,7 @@ private void createNewTasks() throws JsonProcessingException .collect(Collectors.toSet()); } + log.info("Initializing taskGroup[%d] with startingOffsets[%s].", groupId, simpleStartingOffsets); activelyReadingTaskGroups.put( groupId, new TaskGroup( diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 4a72e01e7c46..dc0db5b56839 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2310,8 +2310,10 @@ protected SegmentPublishResult updateDataSourceMetadataInTransaction( startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata()); } - if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) { - // Offsets stored in startMetadata is greater than the last commited metadata. + if (startMetadataMatchesExisting) { + // Proceed with the commit + } else if (startMetadataGreaterThanExisting) { + // Offsets stored in startMetadata is greater than the last committed metadata. // This can happen because the previous task is still publishing its segments and can resolve once // the previous task finishes publishing. return SegmentPublishResult.retryableFailure( @@ -2319,12 +2321,15 @@ protected SegmentPublishResult updateDataSourceMetadataInTransaction( + " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb ); - } - - if (!startMetadataMatchesExisting) { - // Not in the desired start state. + } else { + // startMetadata is older than committed metadata + // The task trying to publish is probably a replica trying to commit offsets already published by another task. + // OR the metadata has been updated manually return SegmentPublishResult.fail( - "Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.", + "Stored metadata state[%s] has already been updated by other tasks and" + + " has diverged from the expected start metadata state[%s]." + + " This task will be replaced by the supervisor with a new task using updated start offsets." + + " Try resetting the supervisor if the issue persists.", oldCommitMetadataFromDb, startMetadata ); } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 0beecb318956..9add89949c83 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -945,8 +945,10 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() ); Assert.assertEquals( SegmentPublishResult.fail( - "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]" - + " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor." + "Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has already been updated by other tasks" + + " and has diverged from the expected start metadata state[ObjectMetadata{theObject=null}]." + + " This task will be replaced by the supervisor with a new task using updated start offsets." + + " Try resetting the supervisor if the issue persists." ), result2 ); @@ -1093,8 +1095,10 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() ); Assert.assertEquals( SegmentPublishResult.fail( - "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and " - + "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor." + "Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has already been updated by other tasks" + + " and has diverged from the expected start metadata state[ObjectMetadata{theObject={foo=qux}}]." + + " This task will be replaced by the supervisor with a new task using updated start offsets." + + " Try resetting the supervisor if the issue persists." ), result2 );