Fix recurring bug "Inconsistency between stored metadata" during auto-scaling#19034
Fix recurring bug "Inconsistency between stored metadata" during auto-scaling#19034kfaraz merged 5 commits intoapache:masterfrom
Conversation
| * <p> | ||
| * Since both of these are in-memory structures, a change in Overlord leadership | ||
| * might cause duplicate scaling actions and/or intermittent task failures due | ||
| * to {@code "Inconsistency between stored metadata and target"}. |
There was a problem hiding this comment.
It already has a new exception description :)
There was a problem hiding this comment.
Pull request overview
This PR fixes a race condition during aggressive auto-scaling that causes tasks to fail with "Inconsistency between stored metadata and target state" errors. The root cause was that partitionOffsets were being cleared before scaling, causing new task groups to initialize with stale offsets from the metadata store instead of the latest checkpointed offsets from pending task groups.
Changes:
- Modified
clearPartitionAssignmentsForScaling()(renamed fromclearAllocationInfo()) to preservepartitionOffsetsduring auto-scaling so subsequent tasks know where previous tasks left off - Simplified conditional logic in
IndexerSQLMetadataStorageCoordinatorand improved error messages to be more user-friendly - Updated tests to verify the fix and removed reflection-based testing in favor of
@VisibleForTestingannotation
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java | Renamed clearAllocationInfo() to clearPartitionAssignmentsForScaling(), removed clearing of partitionOffsets, improved documentation, and made method public with @VisibleForTesting |
| server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java | Simplified conditional logic for metadata state validation and improved error messages to be more descriptive and user-friendly |
| server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java | Updated test assertions to match new error messages |
| extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | Removed reflection-based test method, updated to call clearPartitionAssignmentsForScaling() directly, and added assertion to verify partitionOffsets are preserved |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /** | ||
| * Checks the duration of {@link #activelyReadingTaskGroups}, requests them | ||
| * to checkpoint themselves if they have exceeded the specified run duration | ||
| * or if early stop has been requested. If checkpoint is successfull, the |
There was a problem hiding this comment.
Typo in the javadoc: "successfull" should be "successful".
| * or if early stop has been requested. If checkpoint is successfull, the | |
| * or if early stop has been requested. If checkpoint is successful, the |
| if (startMetadataMatchesExisting) { | ||
| // Proceed with the commit | ||
| } else if (startMetadataGreaterThanExisting) { | ||
| // Offsets stored in startMetadata is greater than the last commited metadata. |
There was a problem hiding this comment.
Typo in comment: "commited" should be "committed".
| // Offsets stored in startMetadata is greater than the last commited metadata. | |
| // Offsets stored in startMetadata is greater than the last committed metadata. |
capistrant
left a comment
There was a problem hiding this comment.
nice investigative work @kfaraz. One comment of me pondering about wording of error messages that you can do with what you please. I see that the copilot bot left some nits about spelling in javadocs that you could batch commit before merging
| "Stored metadata state[%s] has already been updated by other tasks and" | ||
| + " has diverged from the expected start metadata state[%s]." | ||
| + " This task will be replaced by the supervisor with a new task using updated start offsets." | ||
| + " Reset the supervisor if the issue persists.", |
There was a problem hiding this comment.
nit: I wonder if we are better off suggesting the idea of resetting versus directing to do it. The new message is much better than the old though since you offer some reasoning and mention resetting only if it persists rather than the old message that just says to try resetting it 😜. same apples to all the error messages in this file and is really just semantics. Trying to reduce the Druid liability (not that there is such a thing in this sense) if an operator gets mad cuz they reset without knowing what it does and blames the error message
There was a problem hiding this comment.
Thanks for calling this out. Yeah, I guess it's safer to continue with the suggestive tone for the time being. 😛
|
Thanks a lot for the reviews, @Fly-Style , @capistrant ! |
|
Merging off as failure is unrelated. |
…er group is pending (#19091) Follow up to #19034 Changes --------- - Add method `SeekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions()` - Use this method to check if a task needs to wait before publishing its own offests - Update `SegmentTransactionalAppendAction` and `SegmentTransactionalInsertAction` to return a retryable error response only if there is a pending publish that conflicts with the current action - Fix behaviour of scale down on task rollover in `SeekableStreamSupervisor` - Fix bug in `SeekableStreamSupervisorIOConfig` - Fix bug in `CostBasedAutoScaler` to avoid spurious scale downs - Validate metrics in `CostBasedAutoScaler` before proceeding with scaling action - Add new tests in `CostBasedAutoScalerIntegrationTest`
Description
During aggressive auto-scaling, the tasks frequently fail with the error "Inconsistency between stored metadata and target state" causing ingestion lag. This is typically a self-healing issue as the supervisor re-launches the failed tasks with updated offsets, but it is still operational overhead and often causes ingestion lag.
The root cause behind this failure seems to be the following race condition:
changeTaskCount()is calledcheckTaskDuration()tries to checkpoint the actively reading tasks and moves them to pending completioncheckTaskDuration()also updates thepartitionOffsetswith the latest result of the checkpointingclearAllocationInfo()clearspartitionOffsetsThe bug does not occur if task group A is able to finish publishing the offsets before task group B has been created.
Changes
partitionOffsetsbefore auto-scaling so that subsequent tasks know where the previous tasks had left off.IndexerSQLMetadataStorageCoordinatorKafkaSupervisorTestto not use reflection and updated test to verify clearing ofpartitionOffsetsNote
This bug may still occur if Overlord leadership changes right before the scaling event.
But there is currently no way to handle that since
partitionOffsetsis an in-memory data structure and is not meant to be persisted.This PR has: