Skip to content

KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…#10269

Closed
chia7712 wants to merge 10 commits intoapache:trunkfrom
chia7712:KAFKA-12410
Closed

KAFKA-12410 KafkaAPis ought to group fetch data before generating fet…#10269
chia7712 wants to merge 10 commits intoapache:trunkfrom
chia7712:KAFKA-12410

Conversation

@chia7712
Copy link
Copy Markdown
Member

@chia7712 chia7712 commented Mar 5, 2021

The fetch data generated by KafkaApis is re-grouped when it is converted to FetchResponse. That is unnecessary since KafkaApis can keep a grouped collection for fetch data. The other main changes are shown below.

  1. remove FetchResponse#of
  2. remove useless constructor from FetchResponse
  3. remove PartitionIterator

JMH Tests

  • cpu: intel i9-10900
  • ram: 64 GB
  • IncrementalFetchContextBenchmark

trunk (#10291)

IncrementalFetchContextBenchmark.getResponseSize                                                 avgt   10        18.006 ±       1.749   ms/op
IncrementalFetchContextBenchmark.getResponseSize:·gc.alloc.rate                                  avgt   10       564.158 ±      54.754  MB/sec
IncrementalFetchContextBenchmark.getResponseSize:·gc.alloc.rate.norm                             avgt   10  11190518.325 ±      48.355    B/op
IncrementalFetchContextBenchmark.getResponseSize:·gc.churn.G1_Eden_Space                         avgt   10       564.717 ±     102.891  MB/sec
IncrementalFetchContextBenchmark.getResponseSize:·gc.churn.G1_Eden_Space.norm                    avgt   10  11202996.185 ± 1780869.078    B/op
IncrementalFetchContextBenchmark.getResponseSize:·gc.churn.G1_Old_Gen                            avgt   10         0.001 ±       0.004  MB/sec
IncrementalFetchContextBenchmark.getResponseSize:·gc.churn.G1_Old_Gen.norm                       avgt   10        17.825 ±      85.222    B/op
IncrementalFetchContextBenchmark.getResponseSize:·gc.churn.G1_Survivor_Space                     avgt   10        12.259 ±      19.367  MB/sec
IncrementalFetchContextBenchmark.getResponseSize:·gc.churn.G1_Survivor_Space.norm                avgt   10    252405.775 ±  410781.330    B/op
IncrementalFetchContextBenchmark.getResponseSize:·gc.count                                       avgt   10        48.000                counts
IncrementalFetchContextBenchmark.getResponseSize:·gc.time                                        avgt   10      8665.000                    ms
IncrementalFetchContextBenchmark.updateAndGenerateResponseData                                   avgt   10        16.580 ±       1.551   ms/op
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.alloc.rate                    avgt   10       581.410 ±      54.691  MB/sec
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.alloc.rate.norm               avgt   10  10649362.111 ±      69.365    B/op
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.churn.G1_Eden_Space           avgt   10       569.026 ±      86.815  MB/sec
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.churn.G1_Eden_Space.norm      avgt   10  10441914.443 ± 1589284.501    B/op
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.churn.G1_Survivor_Space       avgt   10        11.383 ±      22.870  MB/sec
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.churn.G1_Survivor_Space.norm  avgt   10    205891.249 ±  413155.900    B/op
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.count                         avgt   10        45.000                counts
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.time                          avgt   10      8692.000                    ms

patch (603da4d)

IncrementalFetchContextBenchmark.getResponseSize                                                 avgt   10        15.883 ±       1.696   ms/op
IncrementalFetchContextBenchmark.getResponseSize:·gc.alloc.rate                                  avgt   10       621.511 ±      66.191  MB/sec
IncrementalFetchContextBenchmark.getResponseSize:·gc.alloc.rate.norm                             avgt   10  10823170.429 ±      68.899    B/op
IncrementalFetchContextBenchmark.getResponseSize:·gc.churn.G1_Eden_Space                         avgt   10       615.643 ±     108.319  MB/sec
IncrementalFetchContextBenchmark.getResponseSize:·gc.churn.G1_Eden_Space.norm                    avgt   10  10747008.602 ± 1840451.381    B/op
IncrementalFetchContextBenchmark.getResponseSize:·gc.churn.G1_Survivor_Space                     avgt   10         4.642 ±      10.201  MB/sec
IncrementalFetchContextBenchmark.getResponseSize:·gc.churn.G1_Survivor_Space.norm                avgt   10     86612.407 ±  193293.062    B/op
IncrementalFetchContextBenchmark.getResponseSize:·gc.count                                       avgt   10        49.000                counts
IncrementalFetchContextBenchmark.getResponseSize:·gc.time                                        avgt   10      9855.000                    ms
IncrementalFetchContextBenchmark.updateAndGenerateResponseData                                   avgt   10        16.457 ±       1.462   ms/op
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.alloc.rate                    avgt   10       601.353 ±      53.868  MB/sec
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.alloc.rate.norm               avgt   10  10865891.101 ±      60.561    B/op
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.churn.G1_Eden_Space           avgt   10       597.302 ±      82.682  MB/sec
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.churn.G1_Eden_Space.norm      avgt   10  10813118.583 ± 1536423.202    B/op
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.churn.G1_Survivor_Space       avgt   10         2.892 ±       8.490  MB/sec
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.churn.G1_Survivor_Space.norm  avgt   10     54084.799 ±  159225.286    B/op
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.count                         avgt   10        48.000                counts
IncrementalFetchContextBenchmark.updateAndGenerateResponseData:·gc.time                          avgt   10      8639.000                    ms

Performance Tests

  • cpu: intel i9-10900
  • ram: 64 GB
  • script: benchmark_test.py::Benchmark.test_consumer_throughput
  • loops: 30

case 0: +2.760595128 %

{
  "compression_type": "none",
  "security_protocol": "PLAINTEXT",
  "interbroker_security_protocol": "PLAINTEXT"
}	
  • TRUNK: 259.228705 MB/sec
  • PATCH: 266.38496 MB/sec

case 1: -0.6257537776 %

{
  "compression_type": "snappy",
  "security_protocol": "PLAINTEXT",
  "interbroker_security_protocol": "PLAINTEXT"
}	
  • TRUNK: 364.161605 MB/sec
  • PATCH: 361.88285 MB/sec

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@chia7712 chia7712 requested a review from ijuma March 5, 2021 05:49
// Iterator that goes over the given partition map and selects partitions that need to be included in the response.
// If updateFetchContextAndRemoveUnselected is set to true, the fetch context will be updated for the selected
// partitions and also remove unselected ones as they are encountered.
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iterator is unnecessary since we have to generate list collection in order to calculate message size.


def partitionsToLogString(partitions: util.Collection[TopicPartition]): String =
FetchSession.partitionsToLogString(partitions, isTraceEnabled)
def partitionsToLogString(topics: FetchSession.RESP_MAP): String = {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is used to log (DEBUG level) so it should be fine to iterate through whole collection.

// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
val partitions = new util.LinkedHashMap[TopicPartition, FetchResponseData.PartitionData]
val topicResponses = new util.ArrayList[FetchResponseData.FetchableTopicResponse]()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main purpose of this PR. KafkaApis keeps grouped data.

@dajac dajac self-requested a review March 5, 2021 06:52
@dajac
Copy link
Copy Markdown
Member

dajac commented Mar 5, 2021

Nice PR! I will take a look at it on Monday.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a few comments. I couldn't ready all the PR yet.

Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 5, 2021

Thanks for the PR. Can you check the perf impact of these changes?

@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Mar 8, 2021

Thanks for the PR. Can you check the perf impact of these changes?

sure. will add benchmark results tomorrow.

Comment thread core/src/main/scala/kafka/server/KafkaApis.scala
@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Mar 9, 2021

@ijuma The results of performance tests are attached. It does not show obvious performance regression. Will run more tests tomorrow.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 9, 2021

@chia7712 I was thinking about the jmh microbenchmarks that stress fetch, fetch session and so on.

@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Mar 9, 2021

I was thinking about the jmh microbenchmarks that stress fetch, fetch session and so on.

will copy that.

@chia7712
Copy link
Copy Markdown
Member Author

@ijuma the JMH result of fetch session is attached. I tried to have a JMH for stress fetch. However, KafkaApis.handleFetchRequest is hard to be a JMH. It requires a lot of changes ...

@jolshan
Copy link
Copy Markdown
Member

jolshan commented Mar 16, 2021

@chia7712 is there any work in progress for a KafkaApis.handleFetchRequest test? I suspect it would be similar but maybe a bit harder than what I did for the LeaderAndIsr version #10071 (trading replicamanager for fetchmanager, etc). This benchmark would be helpful for #9944 as you could probably guess :)

@chia7712
Copy link
Copy Markdown
Member Author

is there any work in progress for a KafkaApis.handleFetchRequest test? I suspect it would be similar but maybe a bit harder than what I did for the LeaderAndIsr version #10071 (trading replicamanager for fetchmanager, etc). This benchmark would be helpful for #9944 as you could probably guess :)

this PR is blocked by #9944. This PR (and other related issues) aim to remove all extra collection creation by using auto-generated data. In #9944 we have to create a lot of collections to handle the topic id in fetch request. Hence, I need to rethink the value (and approach) of this PR :)

@jolshan
Copy link
Copy Markdown
Member

jolshan commented Mar 17, 2021

@chia7712 I'm rewriting #9944 to use the autogenerated structures based on this PR. Just pushed a version that simplifies the unresolved topic ID handling. I tried to make it easier to build the fetch response using the data object. Going to try to build the response using the data object in most places today and I can push that version as soon as I can.

@chia7712
Copy link
Copy Markdown
Member Author

I'm rewriting #9944 to use the autogenerated structures based on this PR. Just pushed a version that simplifies the unresolved topic ID handling. I tried to make it easier to build the fetch response using the data object. Going to try to build the response using the data object in most places today and I can push that version as soon as I can.

sounds good. If the new approach is very different from #9944, please open a new PR in order to compare them :)

@jolshan
Copy link
Copy Markdown
Member

jolshan commented Mar 19, 2021

sounds good. If the new approach is very different from #9944, please open a new PR in order to compare them :)

@chia7712 I've updated the code. I think this is the direction we want so I didn't open a PR.
Biggest changes to the structure are in these commits:
5a0a6d6
a0b2bc9
ace90b0

The idea is that FetchSession can now generate a list of the unresolvedTopics' FetchResponseData.FetchableTopicResponse. Hopefully from there, it is not too difficult to combine with your approach here.

But let me know if it's hard to read. I can open a new one and revert the changes on the old.

@chia7712 chia7712 closed this Mar 25, 2024
@chia7712 chia7712 deleted the KAFKA-12410 branch March 25, 2024 15:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants