Skip to content

KAFKA-12520: Ensure log loading does not truncate producer state unless required#10388

Closed
dhruvilshah3 wants to merge 4 commits intoapache:trunkfrom
dhruvilshah3:producer-state
Closed

KAFKA-12520: Ensure log loading does not truncate producer state unless required#10388
dhruvilshah3 wants to merge 4 commits intoapache:trunkfrom
dhruvilshah3:producer-state

Conversation

@dhruvilshah3
Copy link
Copy Markdown
Contributor

When we find a .swap file on startup, we typically want to rename and replace it as .log, .index, .timeindex, etc. as a way to complete any ongoing replace operations. These swap files are usually known to have been flushed to disk before the replace operation begins.

One flaw in the current logic is that we recover these swap files on startup and as part of that, end up truncating the producer state and rebuild it from scratch. This is unneeded as the replace operation does not mutate the producer state by itself. It is only meant to replace the .log file along with corresponding indices. Because of this unneeded producer state rebuild operation, we have seen multi-hour startup times for clusters that have large compacted topics.

This patch fixes the issue by doing a sanity check of all records in the segment to swap and rebuilds corresponding indices without mutating the producer state. Similarly, we also rebuild indices without truncating the producer state when we find a missing or corrupted index in the middle of the log.

The patch also adds an extra sanity check to detect invalid bytes at the end of swap segments. Before this patch, we would truncate invalid bytes from the swap segment which could leave us with holes in the log. Because this is an unexpected scenario, we now raise an exception in such cases which will fail the broker on startup.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dhruvilshah3 : Thanks for the PR. A couple of comments below.

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
"recovering segment and rebuilding index files...")
recoverSegment(segment)
if (segment.validateSegmentAndRebuildIndices() > 0)
throw new KafkaStorageException("Found invalid or corrupted messages in segment " + segment.log.file);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could report the number of invalid bytes in the exception? Ditto below and in completeSwapOperations().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*/
@nonthreadsafe
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
def validateSegmentAndRebuildIndices(batchCallbackOpt: Option[FileChannelRecordBatch => Unit] = None) : Int = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this method needs to the logic to trim the indexes at the end?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I added that.

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
"recovering segment and rebuilding index files...")
recoverSegment(segment)
if (segment.validateSegmentAndRebuildIndices() > 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing is that it's possible for a segment after recovery point to have no index file and also be corrupted. In that case, we want to truncate the data instead of failing with an error.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I reworked the logic to handle unflushed files the right way.

def validateSegmentAndRebuildIndices(batchCallbackOpt: Option[FileChannelRecordBatch => Unit] = None) : Int = {
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another problem here in that we are not rebuilding the transaction index. The current logic seems pretty tied up with producer state maintenance. I will try to see if there's a way to separate it out.

@dhruvilshah3
Copy link
Copy Markdown
Contributor Author

Closing this PR as it's being taken forward in #10763.

@dhruvilshah3 dhruvilshah3 deleted the producer-state branch June 9, 2021 23:41
junrao pushed a commit that referenced this pull request Jun 29, 2021
…ss required (#10763)

When we find a .swap file on startup, we typically want to rename and replace it as .log, .index, .timeindex, etc. as a way to complete any ongoing replace operations. These swap files are usually known to have been flushed to disk before the replace operation begins.

One flaw in the current logic is that we recover these swap files on startup and as part of that, end up truncating the producer state and rebuild it from scratch. This is unneeded as the replace operation does not mutate the producer state by itself. It is only meant to replace the .log file along with corresponding indices. Because of this unneeded producer state rebuild operation, we have seen multi-hour startup times for clusters that have large compacted topics.

This patch fixes the issue. With ext4 ordered mode, the metadata are ordered and no matter it is a clean/unclean shutdown. As a result, we rework the recovery workflow as follows.

If there are any .cleaned files, we delete all .swap files with higher/equal offsets due to KAFKA-6264. We also delete the .cleaned files. If no .cleaned file, do nothing for this step.
If there are any .log.swap files left after step 1, they, together with their index files, must be renamed from .cleaned and are complete (renaming from .cleaned to .swap is in reverse offset order). We rename these .log.swap files and their corresponding index files to regular files, while deleting the original files from compaction or segment split if they haven't been deleted.
Do log splitting for legacy log segments with offset overflow (KAFKA-6264)
If there are any other index swap files left, they must come from partial renaming from .swap files to regular files. We can simply rename them to regular files.
credit: some code is copied from @dhruvilshah3 's PR: #10388

Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…ss required (apache#10763)

When we find a .swap file on startup, we typically want to rename and replace it as .log, .index, .timeindex, etc. as a way to complete any ongoing replace operations. These swap files are usually known to have been flushed to disk before the replace operation begins.

One flaw in the current logic is that we recover these swap files on startup and as part of that, end up truncating the producer state and rebuild it from scratch. This is unneeded as the replace operation does not mutate the producer state by itself. It is only meant to replace the .log file along with corresponding indices. Because of this unneeded producer state rebuild operation, we have seen multi-hour startup times for clusters that have large compacted topics.

This patch fixes the issue. With ext4 ordered mode, the metadata are ordered and no matter it is a clean/unclean shutdown. As a result, we rework the recovery workflow as follows.

If there are any .cleaned files, we delete all .swap files with higher/equal offsets due to KAFKA-6264. We also delete the .cleaned files. If no .cleaned file, do nothing for this step.
If there are any .log.swap files left after step 1, they, together with their index files, must be renamed from .cleaned and are complete (renaming from .cleaned to .swap is in reverse offset order). We rename these .log.swap files and their corresponding index files to regular files, while deleting the original files from compaction or segment split if they haven't been deleted.
Do log splitting for legacy log segments with offset overflow (KAFKA-6264)
If there are any other index swap files left, they must come from partial renaming from .swap files to regular files. We can simply rename them to regular files.
credit: some code is copied from @dhruvilshah3 's PR: apache#10388

Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants