Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ private boolean isMatch(final String topic) {

@Override
Source describe() {
return new Source(name, new HashSet<>(topics), pattern);
return new Source(name, topics.size() == 0 ? null : new HashSet<>(topics), pattern);
}
}

Expand Down Expand Up @@ -1281,6 +1281,9 @@ private static class NodeComparator implements Comparator<TopologyDescription.No
@Override
public int compare(final TopologyDescription.Node node1,
final TopologyDescription.Node node2) {
if (node1.equals(node2)) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While debugging the issue, I figure that the use comparators don't do deep comparison. This is the corresponding fix. (Similar below)

return 0;
}
final int size1 = ((AbstractNode) node1).size;
final int size2 = ((AbstractNode) node2).size;

Expand Down Expand Up @@ -1399,6 +1402,7 @@ public abstract static class AbstractNode implements TopologyDescription.Node {
int size;

AbstractNode(final String name) {
Objects.requireNonNull(name, "name cannot be null");
this.name = name;
this.size = 1;
}
Expand Down Expand Up @@ -1435,6 +1439,13 @@ public Source(final String name,
final Set<String> topics,
final Pattern pattern) {
super(name);
if (topics == null && pattern == null) {
throw new IllegalArgumentException("Either topics or pattern must be not-null, but both are null.");
}
if (topics != null && pattern != null) {
throw new IllegalArgumentException("Either topics or pattern must be null, but both are not null.");
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should happen if the topic or the pattern is empty?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think those are special cases. For Pattern.compile(null) it throws NPE, so we would never hit this code but fail before.

Pattern.compile("") should be fine but just don't match any topic. Similar, to empty topic list. For both cases, the consumer should just be idle.

\cc @guozhangwang or @hachikuji to confirm.

this.topics = topics;
this.topicPattern = pattern;
}
Expand Down Expand Up @@ -1479,8 +1490,10 @@ public boolean equals(final Object o) {
final Source source = (Source) o;
// omit successor to avoid infinite loops
return name.equals(source.name)
&& topics.equals(source.topics)
&& topicPattern.equals(source.topicPattern);
&& (topics == null && source.topics == null
|| topics != null && topics.equals(source.topics))
&& (topicPattern == null && source.topicPattern == null
|| topicPattern != null && topicPattern.pattern().equals(source.topicPattern.pattern()));
}

@Override
Expand Down Expand Up @@ -1709,6 +1722,9 @@ private static class GlobalStoreComparator implements Comparator<TopologyDescrip
@Override
public int compare(final TopologyDescription.GlobalStore globalStore1,
final TopologyDescription.GlobalStore globalStore2) {
if (globalStore1.equals(globalStore2)) {
return 0;
}
return globalStore1.id() - globalStore2.id();
}
}
Expand All @@ -1719,6 +1735,9 @@ private static class SubtopologyComparator implements Comparator<TopologyDescrip
@Override
public int compare(final TopologyDescription.Subtopology subtopology1,
final TopologyDescription.Subtopology subtopology2) {
if (subtopology1.equals(subtopology2)) {
return 0;
}
return subtopology1.id() - subtopology2.id();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
Expand All @@ -50,12 +48,14 @@
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import static org.hamcrest.Matchers.not;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -353,9 +353,9 @@ public void testTopicGroups() {
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();

final Map<Integer, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap()));
expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap()));
expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap()));
expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap()));

assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);
Expand Down Expand Up @@ -393,17 +393,17 @@ public void testTopicGroupsByStateStore() {
final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2");
final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3");
expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(
Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
Collections.<String, InternalTopicConfig>emptyMap(),
Collections.singletonMap(store1, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store1, Collections.<String, String>emptyMap()))));
Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"),
Collections.emptyMap(),
Collections.singletonMap(store1, new UnwindowedChangelogTopicConfig(store1, Collections.emptyMap()))));
expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(
Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
Collections.<String, InternalTopicConfig>emptyMap(),
Collections.singletonMap(store2, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store2, Collections.<String, String>emptyMap()))));
Collections.emptySet(), mkSet("topic-3", "topic-4"),
Collections.emptyMap(),
Collections.singletonMap(store2, new UnwindowedChangelogTopicConfig(store2, Collections.emptyMap()))));
expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(
Collections.<String>emptySet(), mkSet("topic-5"),
Collections.<String, InternalTopicConfig>emptyMap(),
Collections.singletonMap(store3, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store3, Collections.<String, String>emptyMap()))));
Collections.emptySet(), mkSet("topic-5"),
Collections.emptyMap(),
Collections.singletonMap(store3, new UnwindowedChangelogTopicConfig(store3, Collections.emptyMap()))));

assertEquals(3, topicGroups.size());
assertEquals(expectedTopicGroups, topicGroups);
Expand Down Expand Up @@ -499,12 +499,7 @@ public void shouldNotAllowNullTopicChooserWhenAddingSink() {

@Test(expected = NullPointerException.class)
public void shouldNotAllowNullNameWhenAddingProcessor() {
builder.addProcessor(null, new ProcessorSupplier() {
@Override
public Processor get() {
return null;
}
});
builder.addProcessor(null, () -> null);
}

@Test(expected = NullPointerException.class)
Expand Down Expand Up @@ -604,14 +599,14 @@ public void shouldAddInternalTopicConfigForWindowStores() {
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
final Map<String, String> properties1 = topicConfig1.getProperties(Collections.<String, String>emptyMap(), 10000);
final Map<String, String> properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000);
assertEquals(2, properties1.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals("appId-store1-changelog", topicConfig1.name());
assertTrue(topicConfig1 instanceof WindowedChangelogTopicConfig);
final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog");
final Map<String, String> properties2 = topicConfig2.getProperties(Collections.<String, String>emptyMap(), 10000);
final Map<String, String> properties2 = topicConfig2.getProperties(Collections.emptyMap(), 10000);
assertEquals(2, properties2.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG));
Expand All @@ -628,7 +623,7 @@ public void shouldAddInternalTopicConfigForNonWindowStores() {
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
assertEquals(1, properties.size());
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
assertEquals("appId-store-changelog", topicConfig.name());
Expand All @@ -642,7 +637,7 @@ public void shouldAddInternalTopicConfigForRepartitionTopics() {
builder.addSource(null, "source", null, null, null, "foo");
final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
assertEquals(3, properties.size());
assertEquals(String.valueOf(-1), properties.get(TopicConfig.RETENTION_MS_CONFIG));
assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
Expand Down Expand Up @@ -708,32 +703,32 @@ public void shouldSortProcessorNodesCorrectly() {

assertTrue(iterator.hasNext());
InternalTopologyBuilder.AbstractNode node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("source1"));
assertEquals("source1", node.name);
assertEquals(6, node.size);

assertTrue(iterator.hasNext());
node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("source2"));
assertEquals("source2", node.name);
assertEquals(4, node.size);

assertTrue(iterator.hasNext());
node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("processor2"));
assertEquals("processor2", node.name);
assertEquals(3, node.size);

assertTrue(iterator.hasNext());
node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("processor1"));
assertEquals("processor1", node.name);
assertEquals(2, node.size);

assertTrue(iterator.hasNext());
node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("processor3"));
assertEquals("processor3", node.name);
assertEquals(2, node.size);

assertTrue(iterator.hasNext());
node = (InternalTopologyBuilder.AbstractNode) iterator.next();
assertTrue(node.name.equals("sink1"));
assertEquals("sink1", node.name);
assertEquals(1, node.size);
}

Expand All @@ -760,7 +755,7 @@ public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
final Map<String, List<String>> stateStoreAndTopics = builder.stateStoreNameToSourceTopics();
final List<String> topics = stateStoreAndTopics.get(storeBuilder.name());

assertTrue("Expected to contain two topics", topics.size() == 2);
assertEquals("Expected to contain two topics", 2, topics.size());

assertTrue(topics.contains("topic-2"));
assertTrue(topics.contains("topic-3"));
Expand All @@ -781,4 +776,74 @@ public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
sameNameForSourceAndProcessor,
new MockProcessorSupplier());
}

@Test
public void shouldThrowIfNameIsNull() {
final Exception e = assertThrows(NullPointerException.class, () -> new InternalTopologyBuilder.Source(null, Collections.emptySet(), null));
assertEquals("name cannot be null", e.getMessage());
}

@Test
public void shouldThrowIfTopicAndPatternAreNull() {
final Exception e = assertThrows(IllegalArgumentException.class, () -> new InternalTopologyBuilder.Source("name", null, null));
assertEquals("Either topics or pattern must be not-null, but both are null.", e.getMessage());
}

@Test
public void shouldThrowIfBothTopicAndPatternAreNotNull() {
final Exception e = assertThrows(IllegalArgumentException.class, () -> new InternalTopologyBuilder.Source("name", Collections.emptySet(), Pattern.compile("")));
assertEquals("Either topics or pattern must be null, but both are not null.", e.getMessage());
}

@Test
public void sourceShouldBeEqualIfNameAndTopicListAreTheSame() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null);
final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null);

assertThat(base, equalTo(sameAsBase));
}

@Test
public void sourceShouldBeEqualIfNameAndPatternAreTheSame() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic"));
final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic"));

assertThat(base, equalTo(sameAsBase));
}

@Test
public void sourceShouldNotBeEqualForDifferentNamesWithSameTopicList() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null);
final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", Collections.singleton("topic"), null);

assertThat(base, not(equalTo(differentName)));
}

@Test
public void sourceShouldNotBeEqualForDifferentNamesWithSamePattern() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic"));
final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", null, Pattern.compile("topic"));

assertThat(base, not(equalTo(differentName)));
}

@Test
public void sourceShouldNotBeEqualForDifferentTopicList() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null);
final InternalTopologyBuilder.Source differentTopicList = new InternalTopologyBuilder.Source("name", Collections.emptySet(), null);
final InternalTopologyBuilder.Source differentTopic = new InternalTopologyBuilder.Source("name", Collections.singleton("topic2"), null);

assertThat(base, not(equalTo(differentTopicList)));
assertThat(base, not(equalTo(differentTopic)));
}

@Test
public void sourceShouldNotBeEqualForDifferentPattern() {
final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic"));
final InternalTopologyBuilder.Source differentPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic2"));
final InternalTopologyBuilder.Source overlappingPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("top*"));

assertThat(base, not(equalTo(differentPattern)));
assertThat(base, not(equalTo(overlappingPattern)));
}
}