KAFKA-6388: Recover from rolling an empty segment that already exists#5986
KAFKA-6388: Recover from rolling an empty segment that already exists#5986hachikuji merged 9 commits intoapache:trunkfrom
Conversation
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, left a few comments.
| if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) { | ||
| // We have seen this happen (see KAFKA-6388) after shouldRoll() returns true for an | ||
| // active segment of size zero because of one of the indexes is "full" (due to _maxEntries == 0). | ||
| warn(s"Trying to roll a new log segment for topic partition $topicPartition with start " + |
There was a problem hiding this comment.
It might be helpful to add index sizes to this message, just so we know when we see this why the roll was triggered.
Also nit: we can leave the topic partition out of this message since logIdent already includes it.
| } else { | ||
| throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with " + | ||
| s"start offset $newOffset while it already exists. Current " + | ||
| s"active segment is $activeSegment.") |
There was a problem hiding this comment.
I think we expect that the conflicting segment to be the active segment, but since the check segments.containsKey(newOffset) is more general, I wonder if we should use segments(newOffsets) in this message instead?
| @@ -1572,12 +1572,28 @@ class Log(@volatile var dir: File, | |||
| val offsetIdxFile = offsetIndexFile(dir, newOffset) | |||
| val timeIdxFile = timeIndexFile(dir, newOffset) | |||
| val txnIdxFile = transactionIndexFile(dir, newOffset) | |||
There was a problem hiding this comment.
Seems we only use these variables in the else case below. Perhaps we can move them there?
| val txnIdxFile = transactionIndexFile(dir, newOffset) | ||
| for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if file.exists) { | ||
| warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") | ||
| Files.delete(file.toPath) |
There was a problem hiding this comment.
This logic was very strange given the former validation below. I think I had not actually appreciated how it affected the analysis of this bug. If we had hit a case where we attempted to roll a segment that already existed in the segments map, then the log and index files would have first been deleted by this logic and then recreated in LogSegment.open. Only then we would have seen the failure in the call to addSegment.
Is it possible that it was this sequence of events that caused us to observe an empty index file after the failure? In other words, could there be a cause other than an empty index that would trigger the unexpected roll?
There was a problem hiding this comment.
That’s true that the bug caused the log and index files to be deleted and then recreated. However, 2 out of about 4-5 times when this incident happened, I saw in the logs that the data log was loaded with 1 segment, log start offset X and log end offset X (log start == log end). Then, some time later, the first append happened to offset X which caused unexpected log roll and the exception. So, we know that one of the causes is an empty index file. However, I don’t have proof that there is no other possible causes. (in other cases, I did not have full logs to be able to confirm empty index files)
|
@hachikuji Thanks for your review. I addressed all your review comments. |
| warn(s"Trying to roll a new log segment with start offset $newOffset while it already" + | ||
| s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + | ||
| s" size of offset index: ${activeSegment.offsetIndex.entries}.") | ||
| deleteSegment(activeSegment) |
There was a problem hiding this comment.
I guess another option would be to resize the index files. Did you feel this would be more reliable?
There was a problem hiding this comment.
I am concerned about resizing index, because it does not seem like the issue happens because we explicitly resize the index. Seems like something happens to mmaped file, which we don't understand yet. So, it seems more reliable to remove and recreate the segment.
| appendInfo.firstOffset match { | ||
| case Some(firstOffset) => roll(firstOffset) | ||
| case None => roll(maxOffsetInMessages - Integer.MAX_VALUE) | ||
| case Some(firstOffset) => roll(Option(firstOffset)) |
There was a problem hiding this comment.
nit: we can use Some(firstOffset) when we have an offset to provide.
There was a problem hiding this comment.
right, was rushing, will fix.
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, just a few small comments on the test case.
| assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) | ||
|
|
||
| // roll active segment with the same base offset of size zero should recreate the segment | ||
| log.roll(Option(0L)) |
There was a problem hiding this comment.
nit: just a couple more of these to convert to Some
| assertEquals(0, readLog(log, 0, 100, Option(1)).records.batches.iterator.next().lastOffset) | ||
| assertEquals(1, readLog(log, 1, 100, Option(2)).records.batches.iterator.next().lastOffset) | ||
|
|
||
| // rolling non-zero size segment with the same base offset actually sets new segment's base offset |
There was a problem hiding this comment.
This comment is a little confusing. Is the intent to test the expected offset parameter passed to roll?
There was a problem hiding this comment.
yeah, when I read this comment, it does sound confusing. Actually, this part of the unit test is just rolling the log to make sure that active segment is zero size. Updated the comment.
|
|
||
| // rolling active segment with the same base offset of size zero should still re-create the | ||
| // active segment when we have more than one segment | ||
| log.roll(Option(2L)) |
There was a problem hiding this comment.
It might be useful to truncate one of the index files manually to simulate the case we're trying to detect. We can verify the index size following the roll.
|
Thanks @hachikuji ! I addressed your comments on test cases. |
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the fix!
|
test failure is unrelated, and was already observed in another PR builds: KAFKA-7540. |
|
retest this please |
…#5986) There were several reported incidents where the log is rolled to a new segment with the same base offset as an active segment, causing KafkaException: Trying to roll a new log segment for topic partition X-N with start offset M while it already exists. In the cases we have seen, this happens to an empty log segment where there is long idle time before the next append and somehow we get to a state where offsetIndex.isFull() returns true due to _maxEntries == 0. This PR recovers from this state by deleting and recreating the segment and all of its associated index files. Reviewers: Jason Gustafson <jason@confluent.io>
…#5986) There were several reported incidents where the log is rolled to a new segment with the same base offset as an active segment, causing KafkaException: Trying to roll a new log segment for topic partition X-N with start offset M while it already exists. In the cases we have seen, this happens to an empty log segment where there is long idle time before the next append and somehow we get to a state where offsetIndex.isFull() returns true due to _maxEntries == 0. This PR recovers from this state by deleting and recreating the segment and all of its associated index files. Reviewers: Jason Gustafson <jason@confluent.io>
…apache#5986) There were several reported incidents where the log is rolled to a new segment with the same base offset as an active segment, causing KafkaException: Trying to roll a new log segment for topic partition X-N with start offset M while it already exists. In the cases we have seen, this happens to an empty log segment where there is long idle time before the next append and somehow we get to a state where offsetIndex.isFull() returns true due to _maxEntries == 0. This PR recovers from this state by deleting and recreating the segment and all of its associated index files. Reviewers: Jason Gustafson <jason@confluent.io>
There were several reported incidents where the log is rolled to a new segment with the same base offset as an active segment, causing KafkaException: Trying to roll a new log segment for topic partition X-N with start offset M while it already exists. From what I have investigated so far, this happens to an empty log segment where there is long idle time before the next append and somehow we get to a state where offsetIndex.isFull() returns true due to _maxEntries == 0. This PR recovers from the state where the active segment is empty and we try to roll to a new segment with the same offset: we delete segment and recreate it.
Committer Checklist (excluded from commit message)