Skip to content

KAFKA-2073: migrate to client-side topic metadata request/response#988

Closed
hachikuji wants to merge 15 commits into
apache:trunkfrom
hachikuji:KAFKA-2073
Closed

KAFKA-2073: migrate to client-side topic metadata request/response#988
hachikuji wants to merge 15 commits into
apache:trunkfrom
hachikuji:KAFKA-2073

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

No description provided.

@hachikuji
Copy link
Copy Markdown
Contributor Author

cc @granthenke

Lots of cleanup needed to make this possible, but hopefully not too controversial. I tried to decouple the Cluster object from the MetadataResponse so that it was more of a "pure" response object and not too tied to client-side logic. Other than that, the main work was in MetadataCache and KafkaApis as would be expected.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: !errors.isEmpty()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@granthenke
Copy link
Copy Markdown
Member

@hachikuji Overall I really like all the cleanup and decoupling. I just have a concern about compatibility. Its not likely to affect most users, but we may still want to be sure we maintain the old constructors, etc.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting here for documentation sake that I think a decent amount of change could be done in the metadata cache still. But this can be handled as part of KAFKA-2969. Things like:

  • eliminate the use of PartitionStateInfo in the cache
  • eliminate the "bridge" method partitionStateToPartitionStateInfo
  • optimally build the data at update time so 'get' requests aren't as heavy
  • add some unit tests

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that's not clear to me is whether we actually should use the request/response objects deep in the server code. Part of the problem is just that it's annoying to do the java/scala adapting all over the place, but you also have to carry around response errors, which leads to weird cases where you're not sure if you need to check the error or not. I was actually debating whether we should leave around some of the case classes (e.g. TopicMetadata) and just remove the serde functions. Then we could leave adapting to the request handling layer. That would leave optimizations such as building the response objects in cache off the table though.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Agreed. I think its important to decouple the request/response from the internals. I think ideally KafkaApis would handle the translation once, and then everything "deeper' would work with server side objects. A good discussion for those "refactoring" jiras.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit surprised that we call apply on cache here. If we want to fail when this is called on a non-existent topic, it would be nicer to throw a more informative message than a generic NoSuchElementException.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code used to be nested in getTopicMetadata function and followed an explicit contains check. As a separate (albeit private) method, it might make more sense to make it safer by returning Option. I don't have a strong preference either way.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. We could also return an empty collection if the item is not in the cache. I don't feel strongly either. :)

@hachikuji
Copy link
Copy Markdown
Contributor Author

@ijuma @granthenke I added some unit tests for MetadataCache and tried to address Ismael's comments.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice refactoring. One minor improvement that you can do is to do the toSeq before the flatMap instead of after. That way you avoid adding everything to a Set (which checks for duplicates and is more expensive).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, sorry for these scattered comments. I'll hopefully do a complete review on Monday, so feel free to leave as is for now.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 8, 2016

LGTM, cc @gwenshap (on top of reducing tech debt, this also improves efficiency of MetadataCache and and its usage by KafkaApis).

@gwenshap
Copy link
Copy Markdown
Contributor

gwenshap commented Mar 8, 2016

Thanks for the ping. Its pretty large, so I'll need few hours to review it. I'll try to fit this in this week.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 8, 2016

Thanks. :)

@granthenke
Copy link
Copy Markdown
Member

LGTM. Thanks for all the work on this @hachikuji and the additional tests!

FYI: This needs to get in before I can update the Metadata response for KIP-4/KIP-36.

We are really close to migrating all of the Requests/Responses under the KAFKA-1927 umbrella.

@hachikuji
Copy link
Copy Markdown
Contributor Author

@ijuma @granthenke Thanks for the reviews!

@gwenshap I'm adding one more commit to remove the readFrom method as discussed above.

@hachikuji
Copy link
Copy Markdown
Contributor Author

Some testing from @fpj revealed a noticeable performance regression in the getTopicMetadata call. I did some benchmarking and found the fix above which appears to restore performance to pre-patch levels. Combined with the change in KafkaApis to remove the second call to getTopicMetadata, that "should" reduce topic metadata latency overall. That said, Flavio's testing suggests that getTopicMetadata can still cause a surprising amount of latency (e.g. 5-10ms) to the overall request overhead, so it might be worth considering @granthenke's suggestion above to cache the metadata responses themselves rather than doing the conversion for every metadata request.

@granthenke
Copy link
Copy Markdown
Member

@fpj Thanks for the testing! @hachikuji If we are back to pre-patch levels. I suggest we tackle that as a follow up. I created KAFKA-2969 to track that sort of work.

@hachikuji
Copy link
Copy Markdown
Contributor Author

@granthenke Sounds good to me.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 10, 2016

@hachikuji, can you please share the details of the benchmark and the results (maybe in a gist)?

@hachikuji
Copy link
Copy Markdown
Contributor Author

@ijuma Yes, happy to. I'll post it in the morning. I was actually hoping you could share some insight into the difference. Intuitively, the change above avoids a second pass over the broker collection, but in my tests, the replica/isr sets only ever had exactly two elements, so I wouldn't have guessed it would make much difference.

def addOrUpdatePartitionInfo(topic: String,
private def addOrUpdatePartitionInfo(topic: String,
partitionId: Int,
stateInfo: PartitionStateInfo) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer aligned after the last commit (I don't like this formatting approach because of this, but it is the Kafka way).

@fpj
Copy link
Copy Markdown
Contributor

fpj commented Mar 10, 2016

@hachikuji I tested with your latest changes and the latency is indeed half of what it was previously, so performance-wise this looks good.

For completeness, here is what I tested precisely. I have measured the latency of processing KafkaApis.getTopicMetadata in handleTopicMetadataRequest. I pre-create 1000 topics with 5 partitions each, and observe the latency of the topic metadata requests while issuing produce requests from the same producer to a random topic out of the 1000 created. This setup essentially issues a topic metadata request every time the producer hits an unknown topic. The latency of getTopicMetadata is currently around 4ms once the number of topics requested gets larger, which is pretty much what I get with 0.8.2. Before this latest change it was around 8-9ms with this branch.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 10, 2016

Good to know @fpj, thanks for checking.

@hachikuji
Copy link
Copy Markdown
Contributor Author

@ijuma For the record, here's a gist of the benchmark I used: https://gist.github.com/hachikuji/7a3b1df8a19d7f6e8dd0. I varied the parameters a bit to see what difference they made, but generally I saw results looking like this:

Before fix:

Benchmark                                    Mode  Cnt     Score    Error  Units
MetadataCacheBenchmark.getOldTopicMetadata  thrpt   20  1096.920 ± 48.125  ops/s
MetadataCacheBenchmark.getTopicMetadata     thrpt   20   760.778 ± 23.984  ops/s

After fix:

Benchmark                                    Mode  Cnt     Score    Error  Units
MetadataCacheBenchmark.getOldTopicMetadata  thrpt   20  1029.025 ± 52.546  ops/s
MetadataCacheBenchmark.getTopicMetadata     thrpt   20  1043.334 ± 24.005  ops/s

@fpj Thanks for confirming!

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 11, 2016

Thanks @hachikuji. I had a look at this and I updated the benchmark a little:

https://gist.github.com/ijuma/da362d00ae45cf477e7e

Data points of interest:

val numTopics = 100
val numPartitions = 1000 // per topic
val numBrokers = 10
val querySize = 100

Compiled with Scala 2.11.8, executed with JDK 8 update 76:

[info] Benchmark                                Mode  Cnt   Score   Error  Units
[info] MetadataCacheBenchmark.getTopicMetadata  avgt   10  50.082 ± 6.523  ms/op

I tweaked things a little and got it down to less than half the time:

[info] Benchmark                                Mode  Cnt   Score   Error  Units
[info] MetadataCacheBenchmark.getTopicMetadata  avgt   10  20.170 ± 0.459  ms/op

A couple of observations:

  • The innermost statement of the innermost loop is executed (numTopics * numPartitions * aliveReplicas) + (numTopics * numPartitions * aliveIsr), which can be a relatively large number.
  • The new protocol classes do a bit more work than the old ones as they populate the Struct instance during construction.
  • It's actually not straightforward to cache the protocol responses as the security protocol is passed as a parameter. It would be easy to replace PartitionStateInfo with UpdateMetadataRequest.PartitionState, but this isn't much of a win in terms of performance.

Pull request is here:

hachikuji#1

Tests passed locally.

KAFKA-2073: Performance improvements
@hachikuji
Copy link
Copy Markdown
Contributor Author

Pulled in @ijuma's optimizations.

@gwenshap
Copy link
Copy Markdown
Contributor

LGTM.

Actually, looks great. I love the metadata cache refactoring and the response-object refactoring.

@asfgit asfgit closed this in 764d8ca Mar 11, 2016
@granthenke
Copy link
Copy Markdown
Member

Yeah this is a great patch. Thanks @hachikuji! (and others for review, perf tests, etc)

efeg pushed a commit to efeg/kafka that referenced this pull request Jan 29, 2020
wcarlson5 pushed a commit to wcarlson5/kafka that referenced this pull request Feb 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants