Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions core/src/main/scala/kafka/tools/StreamsResetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ private void maybeDeleteInternalTopics(final Admin adminClient, final boolean dr

// visible for testing
public void doDelete(final List<String> topicsToDelete,
final Admin adminClient) {
final Admin adminClient) {
boolean hasDeleteErrors = false;
final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
Expand All @@ -673,11 +673,17 @@ private boolean isInternalTopic(final String topicName) {
// Even is this is not expected in general, we need to exclude those topics here
// and don't consider them as internal topics even if they follow the same naming schema.
// Cf. https://issues.apache.org/jira/browse/KAFKA-7930
return !isInputTopic(topicName) && !isIntermediateTopic(topicName)
&& topicName.startsWith(options.valueOf(applicationIdOption) + "-")
&& (topicName.endsWith("-changelog") || topicName.endsWith("-repartition")
|| topicName.endsWith("-subscription-registration-topic")
|| topicName.endsWith("-subscription-response-topic"));
return !isInputTopic(topicName) && !isIntermediateTopic(topicName) && topicName.startsWith(options.valueOf(applicationIdOption) + "-")
&& matchesInternalTopicFormat(topicName);
}

// visible for testing
public boolean matchesInternalTopicFormat(final String topicName) {
return topicName.endsWith("-changelog") || topicName.endsWith("-repartition")
|| topicName.endsWith("-subscription-registration-topic")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a suggestion comment: now looking at this I think maybe we should not add the -topic suffix for those topics since we did not for repartition and changelog :)

|| topicName.endsWith("-subscription-response-topic")
|| topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic")
|| topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
}

public static void main(final String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class StreamsResetterTest {
Expand Down Expand Up @@ -242,6 +243,14 @@ public void shouldDeleteTopic() throws InterruptedException, ExecutionException
}
}

@Test
public void shouldDetermineInternalTopicBasedOnTopicName1() {
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-named-subscription-response-topic"));
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-named-subscription-registration-topic"));
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-12323232-topic"));
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
}

private Cluster createCluster(final int numNodes) {
final HashMap<Integer, Node> nodes = new HashMap<>();
for (int i = 0; i < numNodes; ++i) {
Expand Down