KAFKA-6254: Incremental fetch requests#4418
Conversation
cad085b to
c2d258b
Compare
There was a problem hiding this comment.
Should the return type of the method be Unit?
There was a problem hiding this comment.
Has the return type been changed to Unit yet?
There was a problem hiding this comment.
It's a bit unintuitive to use 0 maxBytes as an indication for removal.
There was a problem hiding this comment.
This seems similar to setting everything to 0 when there is a partition error, right? It would be clearer if our RPC type system supported a more advanced type system.
There was a problem hiding this comment.
Since all the callers are already synchronizing on the session object, do we need to synchronize here?
There was a problem hiding this comment.
It's not technically needed, but it makes the code much clearer because the locking is consistent. It also should have a very small overhead
There was a problem hiding this comment.
For consistency, perhaps it's better to either add local to all offsets or leave it out for all.
There was a problem hiding this comment.
The main reason it's on this one is to distinguish from fetcherLogStartOffset (the LSO of the follower, which is different from ours). Maybe I should add "local" to all of them, though?
There was a problem hiding this comment.
Should we test for !verifyFullFetchResponseParts() here and the one in line 289?
There was a problem hiding this comment.
FetchType could also be SESSIONLESS. Should we check that?
There was a problem hiding this comment.
SESSIONLESS should be handled the same way as FULL. Let me fix this.
There was a problem hiding this comment.
It seems that we can get here if FetchType is SESSIONLESS. In this case, it seems that we want to use the ordering of partitions in next to achieve fairness when there is more data to give than the max fetch response size?
There was a problem hiding this comment.
The ordering should be maintained, since FetchSessionHandler#Builder#next is a LinkedHashMap. I guess there should be a comment about this in the code, so that it's documented.
There was a problem hiding this comment.
This probably needs to be reverted?
|
Rebased on changes to |
50f06aa to
33cea3b
Compare
5dbcd59 to
e5310fc
Compare
|
Rebased on trunk |
a9ec0e0 to
5af21fc
Compare
There was a problem hiding this comment.
nit: These are the same descriptions as above. How about creating a static Field instances or at least extracting the message.
There was a problem hiding this comment.
max_bytes does have different doc strings in different message versions, though. I started looking at adding more constants for this, but it got a bit messy-- maybe a good follow on change?
|
@cmccabe : Thanks for the patch. Only had time to review part of the patch. The following are my comments so far. |
hachikuji
left a comment
There was a problem hiding this comment.
Did a quick pass over the client code and had a few questions/comments.
There was a problem hiding this comment.
Should this be retriable? Same question for FetchSessionIdNotFoundException.
There was a problem hiding this comment.
OK, let's make it retriable
There was a problem hiding this comment.
Perhaps we may as well list out all the partitions?
There was a problem hiding this comment.
As in the comment above, though, there are going to be a huge number of them. So it's not really practical (except maybe on TRACE level)
There was a problem hiding this comment.
Same as above. It will probably be particularly useful for incremental fetches to have the partitions explicitly in the log message.
There was a problem hiding this comment.
nit: can we spell out partitions?
There was a problem hiding this comment.
I'm not sure this is a good idea. If we're unlucky, the partition we're interested in may not be listed. Since this is an exceptional case anyway, I would suggest using the more verbose message.
There was a problem hiding this comment.
OK. If there is an error, we can log all partitions, to make it easier to debug.
There was a problem hiding this comment.
Maybe the name can be more explicit? For example, forgetPartitions?
There was a problem hiding this comment.
I wanted a name that indicated that we want to forget the partitions, but that it hasn't been done yet. I'm open to suggestions, but toForget seemed nice and simple.
There was a problem hiding this comment.
This message should refer to all the partitions in the fetch session, right?
There was a problem hiding this comment.
I am wondering if this can be lowered to DEBUG since it is handled internally.
There was a problem hiding this comment.
I think it makes sense to log since it's a pretty rare occurrence. And if it does start happening a lot, that could indicate a problem.
There was a problem hiding this comment.
We lost the comment we had before, but it seemed useful. Maybe you can update it to be relevant to the new logic.
There was a problem hiding this comment.
Good point. I will add a log message to FetchSessionHandler which will spell out this information
There was a problem hiding this comment.
I was expecting to see some logic to remove a partition from the session following a NOT_LEADER error. Maybe I'm missing it somewhere?
There was a problem hiding this comment.
I think the optimization of using array indices instead of pointers is a bit questionable without some benchmarks. Heaps larger than 32 GB are rarely (or never) used in Kafka. And having to go via the array has some cost as well.
There was a problem hiding this comment.
There are other benefits besides reducing the pointer size. When you use array indices rather than pointers, the garbage collector needs to do less work chasing pointers. See https://issues.apache.org/jira/secure/attachment/12701400/BlocksMap%20redesign.pdf .
Excerpt:
According to an Oracle engineer, large heaps with reference dense objects in old gen with frequently mutating references is brutally hard on GC.When a reference in an old gen object is mutated, the object’s “card page” is marked as dirty. During young gen collection all references in dirty old gen card pages are used as roots for determining reachability of young gen objects.
The [HDFS] block data-structure mutates by necessity, but it does so in a non-GC friendly manner. Report processing inserts a delimiter into the storage’s doubly linked list, moves reported blocks to the head of the storage’s list, then uses the delimiter to determine excess blocks for invalidation. The updating of so many references creates intense pressure on GC.
One reason is young gen maintains a tenuring threshold equating to how conservatively it will promote young gen objects into old gen. The threshold drops relative to the rate of garbage creation and dirtying of old gen cards. The young collector may resort to prematurely promoting objects into old gen when it becomes overrun by spending too much time collecting. CMS is forced to cleanup when the old gen occupancy threshold is exceeded. The prematurely promoted objects lead to excessive fragmentation of old gen.
We can reduce abusive GC behavior by reducing the mutation of references in old gen.
Unlike references, updating primitives (ints, longs, etc) does not mark an old gen page dirty. It does not incur a penalty to young gen collection.
There was a problem hiding this comment.
The fact that we are allocating an array and potentially an Integer to compute the hashCode is suboptimal given that these elements are meant to be added to the ImplicitLinkedHashSet, which doesn't seem to cache hash codes.
There was a problem hiding this comment.
Note that Hashtable uses 11 (a prime number) as the default.
There was a problem hiding this comment.
I will change this to 5, so that we also get 11 as the default number of slots.
3140801 to
1c1697c
Compare
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
log doesn't seem to be used.
There was a problem hiding this comment.
Do we need to store toSend here?
There was a problem hiding this comment.
From the KIP wiki, it seems that legacy request should use 0 as the epoch, not -1?
There was a problem hiding this comment.
Yeah. I posted a correction about this. The correct way is now id = 0, epoch = -1 (previously it was documented as id = -1, epoch = 0)
There was a problem hiding this comment.
Hmm, the caller of this method doesn't seem to be synchronized on the CachedPartition object. Do we have a visibility issue across threads?
There was a problem hiding this comment.
Oops, the comment is wrong. That should read "the appropriate session lock." Fixed.
There was a problem hiding this comment.
Since topicPartition doesn't exist in next if we get here, there is no need to remove it.
There was a problem hiding this comment.
Since the caller always passes in a LinkedHashMap, is there a reason to change this to Map?
There was a problem hiding this comment.
The rationale is that FetchSessionHandler wraps the LinkedHashMap in an UnmodifiableMap, so the type is technically UnmodifiableMap rather than LinkedHashMap at that point. Also, there's things like using Collections.emptyMap in a unit test which you can't use if you have to have a LinkedHashMap
I will add a comment saying that iteration order is significant, though.
There was a problem hiding this comment.
Could we just test on set equal instead of string equal?
There was a problem hiding this comment.
Hmm, are we supposed to test data? Should we build a new request?
There was a problem hiding this comment.
In this case, it was intentional to skip building a new request. I'll add a comment to make it clearer (also we don't need to test data#toSend again)
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
The failed test on the jdk7 run is |
junrao
left a comment
There was a problem hiding this comment.
@cmccabe : Thanks for the patch. Looks good to me. Just a couple of minor comments.
@ijuma and @hachikuji : Do you want to take another look?
There was a problem hiding this comment.
On the server side, we have moved to the s convention for building a string, instead of format.
There was a problem hiding this comment.
OK. I will change it over to the 's' convention
There was a problem hiding this comment.
I thought we agreed in the KIP that this will be a constant and not configurable?
There was a problem hiding this comment.
Good catch, will make this a constant.
Implement incremental fetch requests as described by KIP-227.
|
retest this please |
|
@cmccabe : Thanks for update. The latest code LGTM. Do you have any performance results? It would be useful to see (1) the consumption improvement when there are idle topics, (2) no degradation when caching is disabled. |
|
@junrao I've been testing @cmccabe's patches. One thing that was important to us was the consumption latency that we define as (time it takes to consume a series of 100 small messages + time it takes to commit an offset). With kafka 1.0 and trunk, we'd see that latency exceed our SLA of 50 ms after 40-46k 3x replicated partitions. With Colin's patch, at fa01cf98 (before rebase) we were able to get to 68k 3x replicated partitions with latency of 35 ms. Generally, the offset commit latency is far higher than the consume message latency, 33 ms for the latter case and 46 ms for the former case. I couldn't push more than 68k replicated partitions due to https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6469?filter=allopenissues Let me know if you'd like me to get more results and share additional metrics. We're very excited about this patch! |
|
@afalko : Thanks for sharing the results. Very helpful. Just to clarify, are you saying the offset commit latency is 33ms without this patch and 46ms with the patch? |
|
@afalko : That's interesting. This patch doesn't really optimize the offset commit protocol. So, I am wondering why there is an improvement on offset commit. |
|
The test failure on JDK9 seems to be related to some ZK issues going on when running from the logs: The exception is: I don't think this is related to the patch at all. I will re-run the tests to see if we can get a clean run this time. |
|
retest this please |
|
@afalko: thanks again for your great work testing this. @junrao wrote:
Yeah, that is interesting. After all, we are handling the same number of partitions on the broker, but we are just not serializing them into every RPC like we did before. So I would expect the offset commit time improvement to come from better behaved garbage collection performance or better network utilization. Probably network utilization, since the patch doesn't make too many special efforts to optimize GC (although I made one here and there-- for example using iterators instead of copying a map in one place.) |
|
Offset commits depend on replication, so any improvement to fetch overhead could reduce offset commit latency. If the result is actually meaningful, I would expect to see a similar improvement for produce latency. |
|
Refer to this link for build results (access rights to CI server needed): |
|
@hachikuji : Great point. That makes sense. |
|
Thanks @hachikuji, @junrao, @cmccabe . Fresh off open source presses, I've been able to open source the test I wrote that was measuring the results that I mentioned: https://github.com/salesforce/kafka-partition-availability-benchmark I plan to expand it to have another mode where it is producing continuously without resetting offset. That'll be able to measure the produce latency. |
|
@afalko : Thanks. You may want to link that to the jira so that other people know how your tests were done. |
Author: Colin P. Mccabe <cmccabe@confluent.io> Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com> Closes #4418 from cmccabe/KAFKA-6254
No description provided.