From 7d16b46860aeed5b51aed04e0ada2c873d13793c Mon Sep 17 00:00:00 2001 From: Andras Katona Date: Thu, 14 May 2020 18:25:09 +0200 Subject: [PATCH 1/2] KAFKA-9992: EmbeddedKafkaCluster.deleteTopicAndWait not working with kafka_2.13 Eliminating JavaConverters in EmbeddedKafkaCluster Change-Id: I71d3daee20b4717a3eda602e3e310b431b23c55c --- .../integration/utils/EmbeddedKafkaCluster.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 804c1fcfb3a7c..8136d24399880 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -29,7 +29,6 @@ import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.JavaConverters; import java.io.IOException; import java.util.ArrayList; @@ -284,8 +283,7 @@ public void deleteTopicsAndWait(final long timeoutMs, final String... topics) th * @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0}) */ public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException { - final Set topics = JavaConverters.setAsJavaSetConverter( - brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false)).asJava(); + final Set topics = getAllTopicsInCluster(); for (final String topic : topics) { try { brokers[0].deleteTopic(topic); @@ -314,8 +312,7 @@ private TopicsDeletedCondition(final Collection topics) { @Override public boolean conditionMet() { - final Set allTopics = new HashSet<>(JavaConverters.setAsJavaSetConverter( - brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false)).asJava()); + final Set allTopics = getAllTopicsInCluster(); return !allTopics.removeAll(deletedTopics); } } @@ -329,8 +326,7 @@ private TopicsRemainingCondition(final String... topics) { @Override public boolean conditionMet() { - final Set allTopics = JavaConverters.setAsJavaSetConverter( - brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false)).asJava(); + final Set allTopics = getAllTopicsInCluster(); return allTopics.equals(remainingTopics); } } @@ -348,6 +344,11 @@ public Properties getLogConfig(final String topic) { } public Set getAllTopicsInCluster() { - return JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false)).asJava(); + scala.collection.Iterator topicsIterator = brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator(); + Set topics = new HashSet<>(); + while (topicsIterator.hasNext()) { + topics.add(topicsIterator.next()); + } + return topics; } } From 7f79752258ee0f1483ba87ae8385db02cf362aac Mon Sep 17 00:00:00 2001 From: Andras Katona Date: Mon, 18 May 2020 11:50:18 +0200 Subject: [PATCH 2/2] checkstyle fix --- .../kafka/streams/integration/utils/EmbeddedKafkaCluster.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 8136d24399880..af83248404ae9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -344,8 +344,8 @@ public Properties getLogConfig(final String topic) { } public Set getAllTopicsInCluster() { - scala.collection.Iterator topicsIterator = brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator(); - Set topics = new HashSet<>(); + final scala.collection.Iterator topicsIterator = brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator(); + final Set topics = new HashSet<>(); while (topicsIterator.hasNext()) { topics.add(topicsIterator.next()); }