Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -2129,7 +2128,7 @@ else if (topics instanceof TopicNameCollection)
throw new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for describeTopics.");
}

Call generateDescribeTopicsCallWithMetadataApi(
private Call generateDescribeTopicsCallWithMetadataApi(
List<String> topicNamesList,
Map<String, KafkaFutureImpl<TopicDescription>> topicFutures,
DescribeTopicsOptions options,
Expand Down Expand Up @@ -2192,7 +2191,7 @@ void handleFailure(Throwable throwable) {
};
}

Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi(
private Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi(
List<String> topicNamesList,
Map<String, KafkaFutureImpl<TopicDescription>> topicFutures,
Map<Integer, Node> nodes,
Expand Down Expand Up @@ -2319,27 +2318,27 @@ private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWi
}

if (topicNamesList.isEmpty()) {
return new HashMap<>(topicFutures);
return Collections.unmodifiableMap(topicFutures);
}

// First, we need to retrieve the node info.
DescribeClusterResult clusterResult = describeCluster();
Map<Integer, Node> nodes;
try {
nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node));
} catch (InterruptedException | ExecutionException e) {
completeAllExceptionally(topicFutures.values(), e.getCause());
return new HashMap<>(topicFutures);
}

final long now = time.milliseconds();
clusterResult.nodes().whenComplete(
(nodes, exception) -> {
if (exception != null) {
completeAllExceptionally(topicFutures.values(), exception.getCause());
return;
}

runnable.call(
generateDescribeTopicsCallWithDescribeTopicPartitionsApi(topicNamesList, topicFutures, nodes, options, now),
now
);
final long now = time.milliseconds();
Map<Integer, Node> nodeIdMap = nodes.stream().collect(Collectors.toMap(Node::id, node -> node));
runnable.call(
generateDescribeTopicsCallWithDescribeTopicPartitionsApi(topicNamesList, topicFutures, nodeIdMap, options, now),
now
);
});

return new HashMap<>(topicFutures);
return Collections.unmodifiableMap(topicFutures);
}

private Map<Uuid, KafkaFuture<TopicDescription>> handleDescribeTopicsByIds(Collection<Uuid> topicIds, DescribeTopicsOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertFutureExceptionTypeEquals(results.get(nonExistingTopicId), classOf[UnknownTopicIdException])
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeTopicsWithNames(quorum: String): Unit = {
client = createAdminClient

val existingTopic = "existing-topic"
client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1.toShort)).asJava).all.get()
waitForTopics(client, Seq(existingTopic), List())
ensureConsistentKRaftMetadata()

val existingTopicId = brokers.head.metadataCache.getTopicId(existingTopic)
val results = client.describeTopics(TopicCollection.ofTopicNames(Seq(existingTopic).asJava)).topicNameValues()
assertEquals(existingTopicId, results.get(existingTopic).get.topicId())
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeCluster(quorum: String): Unit = {
Expand Down