diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 804c1fcfb3a7c..af83248404ae9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -29,7 +29,6 @@ import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.JavaConverters; import java.io.IOException; import java.util.ArrayList; @@ -284,8 +283,7 @@ public void deleteTopicsAndWait(final long timeoutMs, final String... topics) th * @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0}) */ public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException { - final Set topics = JavaConverters.setAsJavaSetConverter( - brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false)).asJava(); + final Set topics = getAllTopicsInCluster(); for (final String topic : topics) { try { brokers[0].deleteTopic(topic); @@ -314,8 +312,7 @@ private TopicsDeletedCondition(final Collection topics) { @Override public boolean conditionMet() { - final Set allTopics = new HashSet<>(JavaConverters.setAsJavaSetConverter( - brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false)).asJava()); + final Set allTopics = getAllTopicsInCluster(); return !allTopics.removeAll(deletedTopics); } } @@ -329,8 +326,7 @@ private TopicsRemainingCondition(final String... topics) { @Override public boolean conditionMet() { - final Set allTopics = JavaConverters.setAsJavaSetConverter( - brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false)).asJava(); + final Set allTopics = getAllTopicsInCluster(); return allTopics.equals(remainingTopics); } } @@ -348,6 +344,11 @@ public Properties getLogConfig(final String topic) { } public Set getAllTopicsInCluster() { - return JavaConverters.setAsJavaSetConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false)).asJava(); + final scala.collection.Iterator topicsIterator = brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator(); + final Set topics = new HashSet<>(); + while (topicsIterator.hasNext()) { + topics.add(topicsIterator.next()); + } + return topics; } }