KAFKA-15585: Add DescribeTopics API server side support#14612
KAFKA-15585: Add DescribeTopics API server side support#14612mumrah merged 30 commits intoapache:trunkfrom
Conversation
637d45a to
35f9763
Compare
|
mumrah
left a comment
There was a problem hiding this comment.
Thanks @CalvinConfluent!
Left some comments inline
|
@CalvinConfluent btw, have you updated the KIP to reflect the two RPC schemas you've corrected here? |
|
@mumrah Thanks for the review, KIP updated. |
| val cursor = describeTopicPartitionsRequest.cursor() | ||
| val fetchAllTopics = topics.isEmpty | ||
| if (fetchAllTopics) { | ||
| kRaftMetadataCache.getAllTopics().foreach(topic => topics.append(topic)) |
There was a problem hiding this comment.
If we copy and sort all the topic names anyway, do we need to change the underlying data structure to NavigableMap? We could just use this list to traverse topic info and it will be in order.
There was a problem hiding this comment.
In the fetch all path, no additional sort is required. I did not see a good way to convert Java list to a scala mutable list, so I did the copy.
Use a mutable list for 2 reasons
- It is easier to filter out the topics alphabetically ahead of the cursor topic
- In the fetch all case, I think we should still include the cursor topic in the response if it does not exist. Mutable list make it easier.
There was a problem hiding this comment.
But if you ask whether it is worth the effort to create the full set of underline structures to get an ordered list where we can just sort the topic list, I am not sure.
|
As discussed offline, we will focus on the pagination behavior. |
mumrah
left a comment
There was a problem hiding this comment.
Thanks for the updates @CalvinConfluent! Looks like there are some conflicts with trunk.
I think we should add an integration "request" test for the new RPC. See ApiVersionsRequestTest for a basic example. We can also do this as a follow-up.
aa1c51b to
b2bdf53
Compare
|
@mumrah Thanks for the review. The integration tests will be introduced in the client side change. |
This reverts commit b7cd9f2.
| val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]() | ||
| val endIndex = upperIndex.min(topic.partitions().size()) | ||
| for (partitionId <- startIndex until endIndex) { | ||
| val partition = topic.partitions().get(partitionId) |
There was a problem hiding this comment.
What if partition doesn't exist?
There was a problem hiding this comment.
Do you mean the partitions in the topic are not consecutive? Just realize it is possible.
There was a problem hiding this comment.
Actually it is not possible, the partition index starts with 0 and increments by 1.
Then what is the case if the partition does not exist?
There was a problem hiding this comment.
The data structure leaves a possibility (due to a bug or a change elsewhere) to have arbitrary numbers. It would be good not to crash if the current assumptions are violated.
There was a problem hiding this comment.
Sure, updated.
| if (!partitionResponse.isDefined) { | ||
| val error = try { | ||
| Topic.validate(topicName) | ||
| Errors.UNKNOWN_TOPIC_OR_PARTITION |
There was a problem hiding this comment.
Yeah, but the error is kind of unexpected -- if the user didn't specify a topic in the first place, why would it get an error about a topic that doesn't exist?
mumrah
left a comment
There was a problem hiding this comment.
Thanks for the updates @CalvinConfluent, I like the new iterator approach. I left just one comment on that inline.
I also like that you wrote the new request handler in Java. I think that's a first 😄
| .setEligibleLeaderReplicas(Replicas.toList(partition.elr)) | ||
| .setLastKnownElr(Replicas.toList(partition.lastKnownElr))) | ||
| // The partition id may not be consecutive. | ||
| val partitions = topic.partitions().keySet().stream().sorted().iterator() |
There was a problem hiding this comment.
This has O(N*logN) runtime complexity and O(N) space complexity. We could do O(N) complexity and not have an extra copy if we just iterate over all partitions and filter the ones that fit into the required range (one of your previous implementations had this).
There was a problem hiding this comment.
I am not sure I get it. The partition IDs can be random like the cases in UT, I don't have an O(n) with no extra space simple solution off the top of my head. Maybe running the quick select can do the trick but it is not generically supported by Java.
Instead, I use a tree set to maintain the top K smallest partitions larger than the start index. This is better than the original sorting.
| .setEligibleLeaderReplicas(Replicas.toList(partition.elr)) | ||
| .setLastKnownElr(Replicas.toList(partition.lastKnownElr))) | ||
| } | ||
| val partitions = topic.partitions().keySet() |
There was a problem hiding this comment.
Looks like here we just need to remember the size? Or maybe calculate the nextIndex directly here?
| val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]() | ||
| val endIndex = upperIndex.min(topic.partitions().size()) | ||
| for (partitionId <- startIndex until endIndex) { | ||
| val partition = topic.partitions().get(partitionId) |
There was a problem hiding this comment.
The data structure leaves a possibility (due to a bug or a change elsewhere) to have arbitrary numbers. It would be good not to crash if the current assumptions are violated.
| val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) | ||
| maybeLeader match { | ||
| case None => | ||
| val error = if (!image.cluster().brokers.containsKey(partition.leader)) { |
There was a problem hiding this comment.
I guess we need to see what the client does with the error code.
|
Verified the following tests locally |
mumrah
left a comment
There was a problem hiding this comment.
Thanks for all the work on this @CalvinConfluent. LGTM
Please double check that the failing tests on Jenkins look okay locally.
|
@mumrah Thanks! I have verified the tests failing can pass locally. |
…14612) This patch implements the new DescribeTopicPartitions RPC as defined in KIP-966 (ELR). Additionally, this patch adds a broker config "max.request.partition.size.limit" which limits the number of partitions returned by the new RPC. Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
…14612) This patch implements the new DescribeTopicPartitions RPC as defined in KIP-966 (ELR). Additionally, this patch adds a broker config "max.request.partition.size.limit" which limits the number of partitions returned by the new RPC. Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
…14612) This patch implements the new DescribeTopicPartitions RPC as defined in KIP-966 (ELR). Additionally, this patch adds a broker config "max.request.partition.size.limit" which limits the number of partitions returned by the new RPC. Reviewers: Artem Livshits <alivshits@confluent.io>, Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
…mnative#942) Main changes: - Adapt to the new `AddPartitionsToTxnRequest` from apache/kafka#13231 (KIP-890) - Support the new `DescribeTopicPartitions` request from apache/kafka#14612 (KIP-966), which is required by some admin APIs Other changes: - apache/kafka#13760 will retry when `deleteRecords` returns a retriable error, change the error code to `INVALID_REQUEST`
Introduce the DescribeTopics API and the server-side handling code.
https://issues.apache.org/jira/browse/KAFKA-15585