-
Notifications
You must be signed in to change notification settings - Fork 142
Refactor the handler for METADATA request #1107
Refactor the handler for METADATA request #1107
Conversation
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this work seems very interesting.
I did a quick first pass
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Show resolved
Hide resolved
wenbingshen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work., leave some comments. PTAL.
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Outdated
Show resolved
Hide resolved
|
Thanks for all your reviews because it's better to have more views on such a refactor, though the existing test already covered much. I've addressed all comments now, PTAL again. @eolivelli @wenbingshen @Demogorgon314 |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Outdated
Show resolved
Hide resolved
A great code refactoring! |
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very good
I left one comment about concatList I believe it is better to code it another way
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/CoreUtils.java
Outdated
Show resolved
Hide resolved
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixes #1104 ### Motivation In addition to #1104 explained, the existing code has too many nested code. It relies on an atomic integer and multiple callbacks to wait all futures until completed. Since the futures' callbacks are not guaranteed to be executed in a single thread, the helper containers must be thread safe. The better solution is leveraging `CompletableFuture.allOf` method to get all futures' results in a single callback and process the results or handling them to another future. ### Modifications - Add some helper methods into `CoreUtils` to wait all futures and perform conversion among lists and maps. - Add `ListPair` class to avoid null check when processing the result of `groupingBy`. - Add unit tests for methods and class above. - Add `TopicAndMetadata` class as a cache of the full topic name and the number of partitions, which can be negative to indicate an error. - Use a more functional programming code style to rewrite the handler for METADATA request. - Correct an incorrect check for all topics METADATA request, see https://github.com/apache/kafka/blob/8d88b20b2779faa413ffc4c6d2546800e225213f/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L162-L164, if `topics` field is empty, only version 0 METADATA request is for all topics. ### TODO The `authorizeTopicsAsync` method and methods from `CoreUtils` can also be used to simplify other request handlers. (cherry picked from commit fbd91a8)
Fixes #1104 ### Motivation In addition to #1104 explained, the existing code has too many nested code. It relies on an atomic integer and multiple callbacks to wait all futures until completed. Since the futures' callbacks are not guaranteed to be executed in a single thread, the helper containers must be thread safe. The better solution is leveraging `CompletableFuture.allOf` method to get all futures' results in a single callback and process the results or handling them to another future. ### Modifications - Add some helper methods into `CoreUtils` to wait all futures and perform conversion among lists and maps. - Add `ListPair` class to avoid null check when processing the result of `groupingBy`. - Add unit tests for methods and class above. - Add `TopicAndMetadata` class as a cache of the full topic name and the number of partitions, which can be negative to indicate an error. - Use a more functional programming code style to rewrite the handler for METADATA request. - Correct an incorrect check for all topics METADATA request, see https://github.com/apache/kafka/blob/8d88b20b2779faa413ffc4c6d2546800e225213f/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L162-L164, if `topics` field is empty, only version 0 METADATA request is for all topics. ### TODO The `authorizeTopicsAsync` method and methods from `CoreUtils` can also be used to simplify other request handlers. (cherry picked from commit fbd91a8)
|
This PR will be cherry-picked to branch-2.8.2 after #1120 is merged. |
Fixes #1104 ### Motivation In addition to #1104 explained, the existing code has too many nested code. It relies on an atomic integer and multiple callbacks to wait all futures until completed. Since the futures' callbacks are not guaranteed to be executed in a single thread, the helper containers must be thread safe. The better solution is leveraging `CompletableFuture.allOf` method to get all futures' results in a single callback and process the results or handling them to another future. ### Modifications - Add some helper methods into `CoreUtils` to wait all futures and perform conversion among lists and maps. - Add `ListPair` class to avoid null check when processing the result of `groupingBy`. - Add unit tests for methods and class above. - Add `TopicAndMetadata` class as a cache of the full topic name and the number of partitions, which can be negative to indicate an error. - Use a more functional programming code style to rewrite the handler for METADATA request. - Correct an incorrect check for all topics METADATA request, see https://github.com/apache/kafka/blob/8d88b20b2779faa413ffc4c6d2546800e225213f/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L162-L164, if `topics` field is empty, only version 0 METADATA request is for all topics. ### TODO The `authorizeTopicsAsync` method and methods from `CoreUtils` can also be used to simplify other request handlers. (cherry picked from commit fbd91a8)
#1140) ### Motivation After #1107 was cherry-picked into branch-2.8.2, some tests of `KafkaListenerNameTest` are failed. Because that PR forgot to migrate the following code: https://github.com/streamnative/kop/blob/51563feb582424251c8a8345ed9a7606546cf77e/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java#L792-L796 After that when the advertised listener is different with the listener, the brokers field in `MetadataResponse` doesn't contain the advertised listener, while the leader and ISR in `MetadataResponse.PartitionMetadata` use the advertised listener. Then Kafka client won't be able to find the advertised listener in brokers field. ### Modifications Add the partition leader's node to the brokers field if the brokers field doesn't contain the node.
Fixes streamnative#1104 In addition to streamnative#1104 explained, the existing code has too many nested code. It relies on an atomic integer and multiple callbacks to wait all futures until completed. Since the futures' callbacks are not guaranteed to be executed in a single thread, the helper containers must be thread safe. The better solution is leveraging `CompletableFuture.allOf` method to get all futures' results in a single callback and process the results or handling them to another future. - Add some helper methods into `CoreUtils` to wait all futures and perform conversion among lists and maps. - Add `ListPair` class to avoid null check when processing the result of `groupingBy`. - Add unit tests for methods and class above. - Add `TopicAndMetadata` class as a cache of the full topic name and the number of partitions, which can be negative to indicate an error. - Use a more functional programming code style to rewrite the handler for METADATA request. - Correct an incorrect check for all topics METADATA request, see https://github.com/apache/kafka/blob/8d88b20b2779faa413ffc4c6d2546800e225213f/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L162-L164, if `topics` field is empty, only version 0 METADATA request is for all topics. The `authorizeTopicsAsync` method and methods from `CoreUtils` can also be used to simplify other request handlers. (cherry picked from commit fbd91a8) (cherry picked from commit 2d8203f56d3789412dfb60ad93ece585e9e59683)
Fixes #1104
Motivation
In addition to #1104 explained, the existing code has too many nested code. It relies on an atomic integer and multiple callbacks to wait all futures until completed. Since the futures' callbacks are not guaranteed to be executed in a single thread, the helper containers must be thread safe. The better solution is leveraging
CompletableFuture.allOfmethod to get all futures' results in a single callback and process the results or handling them to another future.Modifications
CoreUtilsto wait all futures and perform conversion among lists and maps.ListPairclass to avoid null check when processing the result ofgroupingBy.TopicAndMetadataclass as a cache of the full topic name and the number of partitions, which can be negative to indicate an error.topicsfield is empty, only version 0 METADATA request is for all topics.TODO
The
authorizeTopicsAsyncmethod and methods fromCoreUtilscan also be used to simplify other request handlers.