KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response#9382
Conversation
b655e46 to
f08b429
Compare
There was a problem hiding this comment.
NIT: Should this be the first check in the if () statement ?
There was a problem hiding this comment.
@rite2nikhil Thanks for the review. Did you mean changing this to:
if (isTruncationOnFetchSupported && initialFetchState.lastFetchedEpoch.nonEmpty && initialFetchState.offset >= 0)
There was a problem hiding this comment.
Wondering if it might be better not to change this type since it is used in contexts where lastFetchedEpoch is not relevant. Following the types through here, we first have use InitialFetchState in AbstractFetcherManager:
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState])We then convert to OffsetAndEpoch which gets passed down to AbstractFetcherThread:
def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): Set[TopicPartition]Then this gets converted to PartitionFetchState. I wonder if it's possible to skip the conversion through OffsetAndEpoch and use InitialFetchState consistently? Perhaps the only reason the current code doesn't do that is that InitialFetchState includes the broker end point which is not really relevant to the fetcher thread. Maybe that's not such a big deal?
There was a problem hiding this comment.
I had initially added another class because I didn't want to change OffsetAndEpoch, but I removed that because it looked like too many similar classes. Your suggestion to use InitialFetchState sounds much better, updated.
There was a problem hiding this comment.
nit: unnecessary parenthesis
There was a problem hiding this comment.
Rather than doing an additional pass over the response partitions, would it be reasonable to build epochEndOffsets inline with the other error handling in processFetchRequest?
There was a problem hiding this comment.
Can be done separately, but it would be nice to figure out how to move this logic into ReplicaAlterLogDirManager since this comment seems to only make sense if we assume this is the log dir fetcher and reconciliation with the leader has already completed.
In fact, I wonder if it is possible to get rid of this code entirely. If the log dir fetcher is also tracking lastFetchedEpoch, then we could rely on detecting truncation dynamically through ReplicaManager.fetchMessages instead of the current somewhat clumsy coordination with the replica fetcher.
There was a problem hiding this comment.
Updated. The method is still there for older versions, but it is now disabled with IBP 2.7.
There was a problem hiding this comment.
Below we only use currentState if the current epoch matches the initial epoch. Why is it safe to skip that check here?
There was a problem hiding this comment.
I refactored this code a bit and added a check for Fetching state. Not sure if I have missed something though. I think we can continue to fetch without truncating if currentState is Fetching when lastFetchedEpoch is known. If we need to truncate, we will do that later when we get told about diverging epochs. Does that make sense?
There was a problem hiding this comment.
Good catch here and in FetchSession. Do you think we should consider doing these fixes separately so that we can get them into 2.7? Otherwise it might be difficult to tie this behavior to the 2.7 IBP.
There was a problem hiding this comment.
Makes sense, will submit another PR with just those changes.
There was a problem hiding this comment.
PR with the changes in DelayedFetch and FetchSession: #9434
There was a problem hiding this comment.
PR with the changes in DelayedFetch and FetchSession: #9434
|
@hachikuji Thanks for the review, have addressed the comments so far. |
aec9e3f to
cb099b7
Compare
There was a problem hiding this comment.
Is it not possible that the InitialFetchState has a bump to the current leader epoch? We will still need the latest epoch in order to continue fetching.
There was a problem hiding this comment.
Do we need to adjust this? I think we want to remain in the Fetching state if truncation detection is through Fetch.
There was a problem hiding this comment.
This is a little unclear to me. I guess it is safe to reset lastFetchedEpoch as long as we reinitialize it after the next leader change. On the other hand, it seems safer to always retain the value.
There was a problem hiding this comment.
Again it seems safe to keep lastFetchedEpoch in sync with the local log. If we have done a full truncation above, then lastFetchedEpoch will be None, but otherwise it seems like we should set it.
There was a problem hiding this comment.
This doesn't seem right. The last fetched epoch is supposed to represent the epoch of the last fetched batch. The fetcher could be fetching the data from an older epoch here.
cb099b7 to
cd871b8
Compare
|
@hachikuji Thanks for the review, have addressed the comments. |
d32dca0 to
6a2495a
Compare
There was a problem hiding this comment.
nit: not sure how much it matters, but maybe we can avoid the extra garbage and just use an integer until we're ready to build the result?
There was a problem hiding this comment.
nit: line below is misaligned
There was a problem hiding this comment.
I am wondering in what situation we would find currentState non-null here. The current logic in ReplicaManager.makeFollowers always calls removeFetcherForPartitions before adding the partition back. The reason I ask is that I wasn't sure we should be taking the last fetched epoch from the initial state or if we should keep the current one. It seems like the latter might be more current?
There was a problem hiding this comment.
I couldn't come up with a case where currentState is non-null, so I removed that check. Haven't seen any test failures as a result, so hopefully that is ok.
There was a problem hiding this comment.
This check is a still a little hard to follow. I think we expect that if initOffset is negative, then lastFetchedEpoch will be empty and we will hit the fetchOffsetAndTruncate case below. Is that right? On the other hand, if lastFetchedEpoch is empty, then initOffset could still be non-negative if we have an old message format, which means we need to enter Truncating so that we can truncate to the high watermark.
One case that is not so clear is when currentState is non-null. Then we will enter the Truncating state below regardless whether isTruncationOnFetchSupported is set or not. Is that what we want?
There was a problem hiding this comment.
@hachikuji Thanks for the review! You mentioned in another comment below that we could use latestEpoch instead of using the epoch from InitialFetchOffset. I have removed lastFetchEpoch from InitialFetchOffset and updated this logic to use latestEpoch. It does look neater now, hope I haven't missed any cases. I had to update some of the unit tests which rely on the initial fetch epoch request to use IBP 2.6, but all tests have passed in my local run. I will kick off a system test run as well.
There was a problem hiding this comment.
It's not clear to me why we set maySkipTruncation to false here. If the truncation is not complete, wouldn't that put us in the Truncating state?
There was a problem hiding this comment.
Fixed, removed that flag.
There was a problem hiding this comment.
I'm a little uncertain about this case. If we have truncated to an earlier offset, wouldn't we also need to reset last fetched epoch? I am thinking we should remove this check and modify the first one:
val (state, lastFetchedEpoch) = if (maySkipTruncation || offsetTruncationState.truncationCompleted)
(Fetching, latestEpoch(topicPartition))I might be missing something though.
There was a problem hiding this comment.
I was trying to to optimize for the case where lastFetchEpoch didn't need to be reset, but you are right, it would need resetting after truncation. Updated to use latestEpoch(topicPartition)).
6a2495a to
09c149b
Compare
|
@hachikuji Thanks for the reviews, addressed the comments. |
b551ba5 to
5f30c1a
Compare
|
Ran system tests on the latest version, there were 7 failures which look like flaky tests that also fail on trunk (one TransactionTest and 6 variants of ConnectDistributedTest). |
hachikuji
left a comment
There was a problem hiding this comment.
@rajinisivaram Thanks, I think we're getting close. I left a few small comments.
There was a problem hiding this comment.
This check is a still a little hard to follow. I think we expect that if initOffset is negative, then lastFetchedEpoch will be empty and we will hit the fetchOffsetAndTruncate case below. Is that right? On the other hand, if lastFetchedEpoch is empty, then initOffset could still be non-negative if we have an old message format, which means we need to enter Truncating so that we can truncate to the high watermark.
One case that is not so clear is when currentState is non-null. Then we will enter the Truncating state below regardless whether isTruncationOnFetchSupported is set or not. Is that what we want?
There was a problem hiding this comment.
This is probably ok. I guess an alternative would be to not take the initial last fetched epoch from InitialFetchState, but instead use latestEpoch.
There was a problem hiding this comment.
nit: I don't think we need this. We can override isTruncationOnFetchSupported with a val
5f30c1a to
c9b36df
Compare
c9b36df to
0c4c959
Compare
0c4c959 to
bf976df
Compare
| fetchOffsetAndTruncate(tp, initialFetchState.currentLeaderEpoch) | ||
| } else if (isTruncationOnFetchSupported) { | ||
| val lastFetchedEpoch = latestEpoch(tp) | ||
| val state = if (lastFetchedEpoch.exists(_ != EpochEndOffset.UNDEFINED_EPOCH)) Fetching else Truncating |
There was a problem hiding this comment.
Hmm.. Do we actually return Some(EpochEndOffset.UNDEFINED_EPOCH) from latestEpoch? That seems surprising.
Might be worth a comment here that we still go through the Truncating state here when the message format is old.
There was a problem hiding this comment.
I was being lazy with that check because we were using Some(EpochEndOffset.UNDEFINED_EPOCH) in AbstractFetcherThreadTest. I have updated the test and fixed the check. Added comment as well.
| val removedPartitions = thread.partitionsAndOffsets | ||
| removeFetcherForPartitions(removedPartitions.keySet) | ||
| val removedPartitions = thread.removeAllPartitions() | ||
| removeFetcherForPartitions(removedPartitions.keySet) // clear state for removed partitions |
There was a problem hiding this comment.
This reads a bit odd following removeAllPartitions. I guess what we get from removeFetcherForPartitions is the clearing of failedPartitions and de-registration from fetcherLagStats. Not super important, but wonder if it's worth trying to consolidate a little. Maybe removeFetcherForPartitions could return the initial fetch offsets or something.
| * diverging epoch is returned in the response, avoiding the need for a separate | ||
| * OffsetForLeaderEpoch request. | ||
| */ | ||
| private def initialFetchOffset(log: Log): Long = { |
There was a problem hiding this comment.
I think this could be saved for a follow-up, but I wonder if we should consider similarly letting the initial offset be determined by the fetcher thread on initialization rather than being passed in. I find it confusing that we expect this to be the high watermark in some cases. It seems a little slippery the way we rely on it in AbstractFetcherThread.truncateToHighWatermark.
There was a problem hiding this comment.
Yes, makes sense, will do that in a follow-up PR.
| val partitions = Set(t1p0, t1p1) | ||
|
|
||
| // Loop 1 -- both topic partitions skip epoch fetch and send fetch request since | ||
| // lastFetchedEpoch is set in initial fetch state. |
|
@hachikuji Thanks for the review, I have addressed the comments. |
|
The system test run from this morning on this PR branch completed with 6 failures, which are known failures in ConnectDistributedTest that also failed in the last trunk test run. |
hachikuji
left a comment
There was a problem hiding this comment.
@rajinisivaram LGTM. Thanks for the PR! Note I pushed a trivial tweak. I will merge once the build completes.
|
@hachikuji Thanks for all your help and patience in reviewing this PR! Merging to trunk. |
…t-for-generated-requests * apache-github/trunk: MINOR: Fix flaky test shouldQueryOnlyActivePartitionStoresByDefault (apache#9681) KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics (apache#9677) MINOR: Fix KTable-KTable foreign-key join example (apache#9683) KAFKA-10473: Add docs on partition size-on-disk, and other log-related metrics (apache#9276) KAFKA-10739; Replace EpochEndOffset with automated protocol (apache#9630) KAFKA-10460: ReplicaListValidator format checking is incomplete (apache#9326) KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response (apache#9382) MINOR: Align the UID inside/outside container (apache#9652) KAFKA-10794 Replica leader election is too slow in the case of too many partitions (apache#9675) KAFKA-10090 Misleading warnings: The configuration was supplied but i… (apache#8826) clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java core/src/test/scala/unit/kafka/server/epoch/util/ReplicaFetcherMockBlockingSend.scala
For IBP 2.7 onwards, fetch responses include diverging epoch and offset in fetch responses if
lastFetchedEpochis provided in the fetch request. This PR uses that information for truncation and avoids the additional OffsetForLeaderEpoch requests in followers whenlastFetchEpochis known.Committer Checklist (excluded from commit message)