From 7ec04b222a9ec23e21ff404c2197f677405f5b34 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Wed, 5 Jun 2024 16:16:26 +0100 Subject: [PATCH 1/2] Fixed potential thread block by admin API --- .../kafka/clients/admin/KafkaAdminClient.java | 34 +++++++++---------- .../api/PlaintextAdminIntegrationTest.scala | 15 ++++++++ 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 5c8d9ebb7990e..8e0d68689a2bd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2129,7 +2129,7 @@ else if (topics instanceof TopicNameCollection) throw new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for describeTopics."); } - Call generateDescribeTopicsCallWithMetadataApi( + private Call generateDescribeTopicsCallWithMetadataApi( List topicNamesList, Map> topicFutures, DescribeTopicsOptions options, @@ -2192,7 +2192,7 @@ void handleFailure(Throwable throwable) { }; } - Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi( + private Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi( List topicNamesList, Map> topicFutures, Map nodes, @@ -2319,27 +2319,27 @@ private Map> handleDescribeTopicsByNamesWi } if (topicNamesList.isEmpty()) { - return new HashMap<>(topicFutures); + return Collections.unmodifiableMap(topicFutures); } // First, we need to retrieve the node info. DescribeClusterResult clusterResult = describeCluster(); - Map nodes; - try { - nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); - } catch (InterruptedException | ExecutionException e) { - completeAllExceptionally(topicFutures.values(), e.getCause()); - return new HashMap<>(topicFutures); - } - - final long now = time.milliseconds(); + clusterResult.nodes().whenComplete( + (nodes, exception) -> { + if (exception != null) { + completeAllExceptionally(topicFutures.values(), exception.getCause()); + return; + } - runnable.call( - generateDescribeTopicsCallWithDescribeTopicPartitionsApi(topicNamesList, topicFutures, nodes, options, now), - now - ); + final long now = time.milliseconds(); + Map nodeIdMap = nodes.stream().collect(Collectors.toMap(Node::id, node -> node)); + runnable.call( + generateDescribeTopicsCallWithDescribeTopicPartitionsApi(topicNamesList, topicFutures, nodeIdMap, options, now), + now + ); + }); - return new HashMap<>(topicFutures); + return Collections.unmodifiableMap(topicFutures); } private Map> handleDescribeTopicsByIds(Collection topicIds, DescribeTopicsOptions options) { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index ffe10c19c4392..2a21873ed208b 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -204,6 +204,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertFutureExceptionTypeEquals(results.get(nonExistingTopicId), classOf[UnknownTopicIdException]) } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeTopicsWithNames(quorum: String): Unit = { + client = createAdminClient + + val existingTopic = "existing-topic" + client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 1.toShort)).asJava).all.get() + waitForTopics(client, Seq(existingTopic), List()) + ensureConsistentKRaftMetadata() + + val existingTopicId = brokers.head.metadataCache.getTopicId(existingTopic) + val results = client.describeTopics(TopicCollection.ofTopicNames(Seq(existingTopic).asJava)).topicNameValues() + assertEquals(existingTopicId, results.get(existingTopic).get.topicId()) + } + @ParameterizedTest @ValueSource(strings = Array("zk", "kraft")) def testDescribeCluster(quorum: String): Unit = { From d2957a8ca2a4add68a102d882f8011613e611ce1 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Thu, 6 Jun 2024 11:25:20 +0100 Subject: [PATCH 2/2] Fixing checkstyle error --- .../java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 8e0d68689a2bd..0bd5bcd3359f9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -276,7 +276,6 @@ import java.util.OptionalLong; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong;