KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs#8812
KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs#8812ijuma merged 4 commits intoapache:trunkfrom
Conversation
|
Tests still have to be written. |
There was a problem hiding this comment.
@junrao does this seem right? If we don't have any segments, we should be able to completely truncate the producer state manager, right?
There was a problem hiding this comment.
Hmm, not sure about this. After KIP-360, we try to retain producer state as long as possible even when the corresponding entries have been removed from the log. However, we're in a strange state given that some of the later segments were apparently removed. Perhaps it is safer to treat this more like a new replica which is starting from scratch.
There was a problem hiding this comment.
Yeah, that's what I was thinking. What's the best way to achieve this (treat this more like a new replica which is starting from scratch)?
rite2nikhil
left a comment
There was a problem hiding this comment.
Do we still need to flush logs after recovery ?https://github.com/apache/kafka/blob/7af645b8e79ca535ce2df202c4fe8ea400a8a60e/core/src/main/scala/kafka/log/Log.scala#L804
7af645b to
0b2f271
Compare
|
@rite2nikhil We don't need the flush the segments since we don't increment the recovery point. However, I'm not sure about the @junrao can you please clarify? |
There was a problem hiding this comment.
nit: this is a big initializer. Are there parts we could move to a method?
There was a problem hiding this comment.
Hmm, not sure about this. After KIP-360, we try to retain producer state as long as possible even when the corresponding entries have been removed from the log. However, we're in a strange state given that some of the later segments were apparently removed. Perhaps it is safer to treat this more like a new replica which is starting from scratch.
There was a problem hiding this comment.
I guess it is because of the semantics of DeleteRecords that we trust the checkpoint over the segment data. Might be worth a comment about that since it is a bit surprising.
There was a problem hiding this comment.
I was also surprised, so I agree. :) Will do.
There was a problem hiding this comment.
Just checking, but the issue here is that we might mistakenly mark the directory is offline if the clean shutdown file did not exist?
There was a problem hiding this comment.
File.delete doesn't throw an exception and we don't check the result. So the previous code was very misleading. That's what I was trying to fix. And to avoid introducing an issue if the file did not exist, I am using deleteIfExists.
There was a problem hiding this comment.
Hmm, logEndOffset is defined by nextOffsetMetadata, which is initialized after loadSegments returns. But recoverLog is called within loadSegments. So does this check work as expected or am I missing something?
There was a problem hiding this comment.
I meant to use logEndOffsetOption, so this is a bug. :) It probably indicates that the variable name is bad (I kept it from before). If I had used the right variable, it would be:
def readNextOffset: Long = {
val fetchData = read(offsetIndex.lastOffset, log.sizeInBytes)
if (fetchData == null)
baseOffset
else
fetchData.records.batches.asScala.lastOption
.map(_.nextOffset)
.getOrElse(baseOffset)
}The idea is that if we delete a bunch of segments, then the recovery point we passed to the Log constructor could be ahead of what remains.
There was a problem hiding this comment.
Can you help me understand what was wrong with this?
There was a problem hiding this comment.
Jun explained in the JIRA, The concern is that if there is a hard failure during recovery, you could end up with a situation where we persisted this, but we did not flush some of the segments. Does that make sense?
There was a problem hiding this comment.
Got it. So we may still be able to reopen an unflushed segment. That makes sense.
There was a problem hiding this comment.
Is it possible that a consumer could see "phantom" messages after recovery, even with this change?
- Kafka Process dies with log data in page cache but not fsync'd
- Recovery process sees the un-fsync'd log data but it looks ok so recovery succeeds, nothing to do.
- Consumer fetches this data
- OS hard dies, losing page cache
- Broker is restarted and consumer tries to repeat fetch from same offset but data has gone.
It seems to me once recovery has run we should be sure that all log segments are persistently stored. I'm not sure if we're currently providing that guarantee. It would be pretty simple just to fsync each segment we recover.
There was a problem hiding this comment.
@purplefox To clarify, the case you are talking about is:
- A catastrophic scenario where a partition is offline (i.e. all replicas are down)
- The OS was not shutdown
- The OS did not flush the data to disk (this typically happens irrespective of our flushes due to OS configurations)
- The replica that was the last member of the ISR comes back up, registers, to ZK and the Controller makes it the
leader (since it was the last member of the ISR, if it was a different replica, it won't be given leadership without
unclean leader election) - The hw is beyond the flushed segments
- Consumer fetches the data beyond the flushed segments
- OS hard dies
This is an interesting edge case, it seems incredibly unlikely, but possible if the hw can be beyond the flushed segments. @junrao & @hachikuji are we missing any detail that protects us against this?
The following has a similar impact:
- Leader accepts a write
- Write is replicated, hw is incremented, but data is not flushed
- All replicas die, but the ISR is not shrunk yet
- Leader receives a write, accepts it, replication doesn't happen since replicas are gone
- ISR is shrunk, hw is incremented
- Producer won't receive a successful ack given min.isr=2, but the consumer reads data that is only in the leader
- Leader crashes and the unflushed data is gone (or hard disk dies and all the data in the leader is gone)
Flushing the segments during recovery helps on some scenarios, but not the one I just mentioned (assuming I am not missing anything). @hachikuji had a "strict min isr" proposal where the ISR is never allowed to shrink below min.isr. I haven't thought about all the details, but perhaps that covers both issues. Thoughts @hachikuji?
There was a problem hiding this comment.
For the case that Tim mentioned, if we defer advancing the recovery point, at step 5, the broker will be forced to do log recovery for all unflushed data. If the data is corrupted on disk, it will be detected during recovery.
For the other case that Ismael mentioned, it is true that data can be lost in that case, but then this is the case where all replicas have failed.
There was a problem hiding this comment.
I think it is a gap that there is no minimum replication factor before a write can get exposed. Any writes that end up seeing the NOT_ENOUGH_REPLICAS_AFTER_APPEND error code are more vulnerable. These are unacknowledged writes, and the producer is expected to retry, but the consumer can still read them once the ISR shrinks and we would still view it as "data loss" if the broker failed before they could be flushed to disk. With the "strict min isr" proposal, the leader is not allowed to shrink the ISR lower than some replication factor, which helps to plug this hole.
Going back to @purplefox's suggestion, it does seem like a good idea to flush segments beyond the recovery point during recovery. It kind of serves to constrain the initial state of the system which makes it easier to reason about (e.g. you only need to worry about the loss of unflushed data from the last restart). Some of the flush weaknesses probably still exist though regardless of this change.
There was a problem hiding this comment.
We discussed this offline and we decided to stick with the fix in this PR for now and to file a separate JIRA to consider flushing unflushed segments during recovery. That would provide stronger guarantees after a restart.
There was a problem hiding this comment.
|
@ijuma : We don't need to flush leaderEpochCache after segment recovery since new leader epochs are added through LeaderEpochFileCache.assign() which does flushing already. |
1. Don't advance recovery point in `recoverLog` unless there was a clean shutdown. 2. Ensure the recovery point is not ahead of the log end offset. 3. Clean and flush leader epoch cache and truncate produce state manager if deleting segments due to log end offset being smaller than log start offset. 4. If we are unable to delete clean shutdown file that exists, mark the directory as offline (this was the intent, but the code was wrong).
0b2f271 to
91de172
Compare
c02bc3c to
46d5cef
Compare
…#8812) 1. Don't advance recovery point in `recoverLog` unless there was a clean shutdown. 2. Ensure the recovery point is not ahead of the log end offset. 3. Clean and flush leader epoch cache and truncate produce state manager if deleting segments due to log end offset being smaller than log start offset. 4. If we are unable to delete clean shutdown file that exists, mark the directory as offline (this was the intent, but the code was wrong). Updated one test that was failing after this change to verify the new behavior. Reviewers: Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>
|
Tests passed, merged to trunk and 2.8. |
* apache-github/trunk: (37 commits) KAFKA-10357: Extract setup of changelog from Streams partition assignor (apache#10163) KAFKA-10251: increase timeout for consuming records (apache#10228) KAFKA-12394; Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission (apache#10223) MINOR: Disable transactional/idempotent system tests for Raft quorums (apache#10224) KAFKA-10766: Unit test cases for RocksDBRangeIterator (apache#9717) KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore (apache#10052) KAFKA-12268: Implement task idling semantics via currentLag API (apache#10137) MINOR: Time and log producer state recovery phases (apache#10241) MINOR: correct the error message of validating uint32 (apache#10193) MINOR: Format the revoking active log output in `StreamsPartitionAssignor` (apache#10242) KAFKA-12323 Follow-up: Refactor the unit test a bit (apache#10205) MINOR: Remove stack trace of the lock exception in a debug log4j (apache#10231) MINOR: Word count should account for extra whitespaces between words (apache#10229) MINOR; Small refactor in `GroupMetadata` (apache#10236) KAFKA-10340: Proactively close producer when cancelling source tasks (apache#10016) KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist (apache#10141) KAFKA-12254: Ensure MM2 creates topics with source topic configs (apache#10217) MINOR: fix kafka-metadata-shell.sh (apache#10226) KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (apache#10199) KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs (apache#8812) ...
recoverLogunless there was a cleanshutdown.
if deleting segments due to log end offset being smaller than log start
offset.
directory as offline (this was the intent, but the code was wrong).
Updated one test that was failing after this change to verify the new behavior.
Committer Checklist (excluded from commit message)