diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 574e9c66e291a..1acb5b63706bc 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -649,7 +649,7 @@ private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dr // visible for testing public void doDelete(final List topicsToDelete, - final Admin adminClient) { + final Admin adminClient) { boolean hasDeleteErrors = false; final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete); final Map> results = deleteTopicsResult.values(); @@ -673,11 +673,17 @@ private boolean isInternalTopic(final String topicName) { // Even is this is not expected in general, we need to exclude those topics here // and don't consider them as internal topics even if they follow the same naming schema. // Cf. https://issues.apache.org/jira/browse/KAFKA-7930 - return !isInputTopic(topicName) && !isIntermediateTopic(topicName) - && topicName.startsWith(options.valueOf(applicationIdOption) + "-") - && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition") - || topicName.endsWith("-subscription-registration-topic") - || topicName.endsWith("-subscription-response-topic")); + return !isInputTopic(topicName) && !isIntermediateTopic(topicName) && topicName.startsWith(options.valueOf(applicationIdOption) + "-") + && matchesInternalTopicFormat(topicName); + } + + // visible for testing + public boolean matchesInternalTopicFormat(final String topicName) { + return topicName.endsWith("-changelog") || topicName.endsWith("-repartition") + || topicName.endsWith("-subscription-registration-topic") + || topicName.endsWith("-subscription-response-topic") + || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic") + || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic"); } public static void main(final String[] args) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java index 32d616f3fe356..aa2469ac14ba2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java @@ -42,6 +42,7 @@ import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class StreamsResetterTest { @@ -242,6 +243,14 @@ public void shouldDeleteTopic() throws InterruptedException, ExecutionException } } + @Test + public void shouldDetermineInternalTopicBasedOnTopicName1() { + assertTrue(streamsResetter.matchesInternalTopicFormat("appId-named-subscription-response-topic")); + assertTrue(streamsResetter.matchesInternalTopicFormat("appId-named-subscription-registration-topic")); + assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-12323232-topic")); + assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic")); + } + private Cluster createCluster(final int numNodes) { final HashMap nodes = new HashMap<>(); for (int i = 0; i < numNodes; ++i) {