MINOR: Next round of fetcher thread consolidation#5587
MINOR: Next round of fetcher thread consolidation#5587hachikuji merged 7 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
Am I missing something or did we drop the usage of sessionPartitions? Was that not needed?
There was a problem hiding this comment.
Nope, you're right. I misunderstood the usage.
|
retest this please |
|
@hachikuji I tried running a build locally, but see a lot of integration test failures. Do they work for you? |
|
@rajinisivaram Sorry about that. I must have broken something in the recent push. I'll take a look. |
|
@rajinisivaram I pushed a couple fixes. Tests are passing locally. |
5597101 to
27c4309
Compare
|
retest this please |
rajinisivaram
left a comment
There was a problem hiding this comment.
@hachikuji Thanks for the PR. Left a minor question (just for my understanding). LGTM.
|
|
||
| private def processFetchRequest(fetchRequest: REQ) { | ||
| private def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = { | ||
| val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, OffsetTruncationState] |
There was a problem hiding this comment.
nit: don't need scala.collection.?
| .forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, fetchData.toSend) | ||
| .setMaxBytes(maxBytes) | ||
| .toForget(fetchData.toForget) | ||
| if (fetchMetadataSupported) { |
There was a problem hiding this comment.
Is this check not required? I wasn't sure why the check was there before.
There was a problem hiding this comment.
I couldn't figure out why we needed it either. If the fetch version doesn't support it, then the metadata will be ignored. It was odd to check for metadata support, but not the toForget partitions.
rajinisivaram
left a comment
There was a problem hiding this comment.
@hachikuji Thanks for the explanation, LGTM
|
@rajinisivaram Thanks for the review. The test failure is unrelated. I submitted a separate fix for it #5595. I will go ahead and merge. |
|
Nice clean-up! One remaining item that didn't look nice was the |
junrao
left a comment
There was a problem hiding this comment.
@hachikuji : Sorry for the late review. Added one more comment below.
| currentPartitionFetchState.isReadyForFetch) { | ||
| // In this case, we only want to process the fetch response if the partition state is ready for fetch and | ||
| // the current offset is the same as the offset requested. | ||
| val fetchOffset = fetchStates(topicPartition).fetchOffset |
There was a problem hiding this comment.
There is actually a reason why we fetch the partition state directly from partitionStates here again under the partitionMapLock. partitionStates can be modified after we release the lock in doWork() (e.g, due to a leadership change). When we get to processFetchRequest(), a replica may have become the leader and therefore have been removed from partitionStates. In this case, we don't want to append the pending fetched data to the log since this partition could have appended new data from the producer to the log. This may cause leader epoch to be out of order in the log.
There was a problem hiding this comment.
On another look, the change is fine. We still check partitionStates in processFetchRequest().
Pull the epoch request build logic up to `AbstractFetcherThread`. Also get rid of the `FetchRequest` indirection. Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>
Pull the epoch request build logic up to
AbstractFetcherThread. Also get rid of theFetchRequestindirection.Committer Checklist (excluded from commit message)