From 5003cd7b10e535a87b5be0afe327113ce7965542 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 13 Oct 2025 13:53:01 -0700 Subject: [PATCH] [SPARK-XXX] Remove deprecated API usages of DescribeTopicsResult KAFKA-10774 Support Describe topic using topic IDs KAFKA-18289 Remove deprecated methods of DescribeTopicsResult --- .../spark/sql/kafka010/ConsumerStrategy.scala | 17 +++++++++-------- .../spark/sql/kafka010/KafkaTestUtils.scala | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala index ab41e53d8ffbe..2d51d8d8c35d4 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala @@ -62,14 +62,15 @@ private[kafka010] sealed trait ConsumerStrategy extends Logging { .build() protected def retrieveAllPartitions(admin: Admin, topics: Set[String]): Set[TopicPartition] = { - admin.describeTopics(topics.asJava).all().get().asScala.filterNot(_._2.isInternal).flatMap { - case (topic, topicDescription) => - topicDescription.partitions().asScala.map { topicPartitionInfo => - val partition = topicPartitionInfo.partition() - logDebug(s"Partition found: $topic:$partition") - new TopicPartition(topic, partition) - } - }.toSet + admin.describeTopics(topics.asJava).allTopicNames().get().asScala.filterNot(_._2.isInternal) + .flatMap { + case (topic, topicDescription) => + topicDescription.partitions().asScala.map { topicPartitionInfo => + val partition = topicPartitionInfo.partition() + logDebug(s"Partition found: $topic:$partition") + new TopicPartition(topic, partition) + } + }.toSet } } diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 0e1ca7af14a43..b31f6af1c794c 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -454,7 +454,7 @@ class KafkaTestUtils( } private def getOffsets(topics: Set[String], offsetSpec: OffsetSpec): Map[TopicPartition, Long] = { - val listOffsetsParams = adminClient.describeTopics(topics.asJava).all().get().asScala + val listOffsetsParams = adminClient.describeTopics(topics.asJava).allTopicNames().get().asScala .flatMap { topicDescription => topicDescription._2.partitions().asScala.map { topicPartitionInfo => new TopicPartition(topicDescription._1, topicPartitionInfo.partition())