From 67f8f8b99e9a451efa5e02df7f2597e63c87ec21 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 16 Apr 2019 18:14:22 -0700 Subject: [PATCH 1/8] KAFKA-8240: Fix NPE in Source.equals() --- .../processor/internals/InternalTopologyBuilder.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 792df532acd0f..b0d44eaf4ec5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1480,7 +1480,8 @@ public boolean equals(final Object o) { // omit successor to avoid infinite loops return name.equals(source.name) && topics.equals(source.topics) - && topicPattern.equals(source.topicPattern); + && (topicPattern == null && source.topicPattern == null + || topicPattern != null && topicPattern.equals(source.topicPattern)); } @Override @@ -1653,7 +1654,9 @@ public boolean equals(final Object o) { final Subtopology that = (Subtopology) o; return id == that.id - && nodes.equals(that.nodes); + // convert both TreeSets to arrays to ensure .equals() is used recursively + // otherwise, the provides NODE_COMPARATOR is used what might result in incorrect comparison + && Arrays.equals(nodes.toArray(), that.nodes.toArray()); } @Override @@ -1796,7 +1799,9 @@ public boolean equals(final Object o) { } final TopologyDescription that = (TopologyDescription) o; - return subtopologies.equals(that.subtopologies) + // convert both TreeSets to arrays to ensure .equals() is used recursively + // otherwise, the provides SUBTOPOLOGY_COMPARATOR is used what might result in incorrect comparison + return Arrays.equals(subtopologies.toArray(), that.subtopologies.toArray()) && globalStores.equals(that.globalStores); } From ebde0b925f51ea01be2922b2c9a3d5dbb3cff9d4 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 17 Apr 2019 00:20:32 -0700 Subject: [PATCH 2/8] Fix --- .../internals/InternalTopologyBuilder.java | 18 +++++++++------- .../InternalTopologyBuilderTest.java | 21 ++++++++++++++++--- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index b0d44eaf4ec5b..f6d7774bfcb04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1281,6 +1281,9 @@ private static class NodeComparator implements Comparator Date: Wed, 17 Apr 2019 00:24:00 -0700 Subject: [PATCH 3/8] Restore comment --- .../streams/processor/internals/InternalTopologyBuilder.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index f6d7774bfcb04..048a90b623204 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1480,6 +1480,7 @@ public boolean equals(final Object o) { } final Source source = (Source) o; + // omit successor to avoid infinite loops return name.equals(source.name) && topics.equals(source.topics) && (topicPattern == null && source.topicPattern == null From 33b9e3a2800f06e5ef1ddf15ef99998cd0306ac6 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 19 Apr 2019 19:01:25 -0700 Subject: [PATCH 4/8] Github comments --- .../internals/InternalTopologyBuilderTest.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 9fb02e76a75a2..80a144e25793d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -784,7 +784,7 @@ public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { } @Test - public void shouldCompareSourceNode() { + public void shouldCompareSourceNodeWithTopicList() { final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", Collections.singleton("topic"), null); @@ -796,4 +796,18 @@ public void shouldCompareSourceNode() { assertThat(base, not(equalTo(differentTopicList))); assertThat(base, not(equalTo(differentTopic))); } + + @Test + public void shouldCompareSourceNodeWithTopicPattern() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); + final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); + final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", null, Pattern.compile("topic")); + final InternalTopologyBuilder.Source differentPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic2")); + final InternalTopologyBuilder.Source overlappingPattern = new InternalTopologyBuilder.Source("name", Collections.singleton("top*"), null); + + assertThat(base, equalTo(sameAsBase)); + assertThat(base, not(equalTo(differentName))); + assertThat(base, not(equalTo(differentPattern))); + assertThat(base, not(equalTo(overlappingPattern))); + } } From ea3e4981b680f9c1986960e2b5922bbea9815c98 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 20 Apr 2019 13:42:43 -0700 Subject: [PATCH 5/8] Github comments --- .../internals/InternalTopologyBuilder.java | 13 +++- .../InternalTopologyBuilderTest.java | 72 ++++++++++--------- 2 files changed, 51 insertions(+), 34 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 048a90b623204..dbc68fcdec28d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1402,6 +1402,7 @@ public abstract static class AbstractNode implements TopologyDescription.Node { int size; AbstractNode(final String name) { + Objects.requireNonNull(name); this.name = name; this.size = 1; } @@ -1438,6 +1439,13 @@ public Source(final String name, final Set topics, final Pattern pattern) { super(name); + if (topics == null && pattern == null) { + throw new IllegalArgumentException("Either topics or pattern must be not-null, but both are null."); + } + if (topics != null && pattern != null) { + throw new IllegalArgumentException("Either topics or pattern must be null, but both are not null."); + } + this.topics = topics; this.topicPattern = pattern; } @@ -1482,9 +1490,10 @@ public boolean equals(final Object o) { final Source source = (Source) o; // omit successor to avoid infinite loops return name.equals(source.name) - && topics.equals(source.topics) + && (topics == null && source.topics == null + || topics != null && topics.equals(source.topics)) && (topicPattern == null && source.topicPattern == null - || topicPattern != null && topicPattern.equals(source.topicPattern)); + || topicPattern != null && topicPattern.pattern().equals(source.topicPattern.pattern())); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 80a144e25793d..460c5d19d3d0b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -23,8 +23,6 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.errors.TopologyException; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.state.KeyValueStore; @@ -354,9 +352,9 @@ public void testTopicGroups() { final Map topicGroups = builder.topicGroups(); final Map expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap())); - expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap())); - expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap())); + expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap())); + expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-3", "topic-4"), Collections.emptyMap(), Collections.emptyMap())); + expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-5"), Collections.emptyMap(), Collections.emptyMap())); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -394,17 +392,17 @@ public void testTopicGroupsByStateStore() { final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2"); final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3"); expectedTopicGroups.put(0, new InternalTopologyBuilder.TopicsInfo( - Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), - Collections.emptyMap(), - Collections.singletonMap(store1, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store1, Collections.emptyMap())))); + Collections.emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), + Collections.emptyMap(), + Collections.singletonMap(store1, new UnwindowedChangelogTopicConfig(store1, Collections.emptyMap())))); expectedTopicGroups.put(1, new InternalTopologyBuilder.TopicsInfo( - Collections.emptySet(), mkSet("topic-3", "topic-4"), - Collections.emptyMap(), - Collections.singletonMap(store2, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store2, Collections.emptyMap())))); + Collections.emptySet(), mkSet("topic-3", "topic-4"), + Collections.emptyMap(), + Collections.singletonMap(store2, new UnwindowedChangelogTopicConfig(store2, Collections.emptyMap())))); expectedTopicGroups.put(2, new InternalTopologyBuilder.TopicsInfo( - Collections.emptySet(), mkSet("topic-5"), - Collections.emptyMap(), - Collections.singletonMap(store3, (InternalTopicConfig) new UnwindowedChangelogTopicConfig(store3, Collections.emptyMap())))); + Collections.emptySet(), mkSet("topic-5"), + Collections.emptyMap(), + Collections.singletonMap(store3, new UnwindowedChangelogTopicConfig(store3, Collections.emptyMap())))); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -500,12 +498,7 @@ public void shouldNotAllowNullTopicChooserWhenAddingSink() { @Test(expected = NullPointerException.class) public void shouldNotAllowNullNameWhenAddingProcessor() { - builder.addProcessor(null, new ProcessorSupplier() { - @Override - public Processor get() { - return null; - } - }); + builder.addProcessor(null, () -> null); } @Test(expected = NullPointerException.class) @@ -605,14 +598,14 @@ public void shouldAddInternalTopicConfigForWindowStores() { final Map topicGroups = builder.topicGroups(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog"); - final Map properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000); + final Map properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000); assertEquals(2, properties1.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals("appId-store1-changelog", topicConfig1.name()); assertTrue(topicConfig1 instanceof WindowedChangelogTopicConfig); final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog"); - final Map properties2 = topicConfig2.getProperties(Collections.emptyMap(), 10000); + final Map properties2 = topicConfig2.getProperties(Collections.emptyMap(), 10000); assertEquals(2, properties2.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG)); @@ -629,7 +622,7 @@ public void shouldAddInternalTopicConfigForNonWindowStores() { final Map topicGroups = builder.topicGroups(); final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 10000); + final Map properties = topicConfig.getProperties(Collections.emptyMap(), 10000); assertEquals(1, properties.size()); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); assertEquals("appId-store-changelog", topicConfig.name()); @@ -643,7 +636,7 @@ public void shouldAddInternalTopicConfigForRepartitionTopics() { builder.addSource(null, "source", null, null, null, "foo"); final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next(); final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo"); - final Map properties = topicConfig.getProperties(Collections.emptyMap(), 10000); + final Map properties = topicConfig.getProperties(Collections.emptyMap(), 10000); assertEquals(3, properties.size()); assertEquals(String.valueOf(-1), properties.get(TopicConfig.RETENTION_MS_CONFIG)); assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG)); @@ -709,32 +702,32 @@ public void shouldSortProcessorNodesCorrectly() { assertTrue(iterator.hasNext()); InternalTopologyBuilder.AbstractNode node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("source1")); + assertEquals("source1", node.name); assertEquals(6, node.size); assertTrue(iterator.hasNext()); node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("source2")); + assertEquals("source2", node.name); assertEquals(4, node.size); assertTrue(iterator.hasNext()); node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("processor2")); + assertEquals("processor2", node.name); assertEquals(3, node.size); assertTrue(iterator.hasNext()); node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("processor1")); + assertEquals("processor1", node.name); assertEquals(2, node.size); assertTrue(iterator.hasNext()); node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("processor3")); + assertEquals("processor3", node.name); assertEquals(2, node.size); assertTrue(iterator.hasNext()); node = (InternalTopologyBuilder.AbstractNode) iterator.next(); - assertTrue(node.name.equals("sink1")); + assertEquals("sink1", node.name); assertEquals(1, node.size); } @@ -761,7 +754,7 @@ public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { final Map> stateStoreAndTopics = builder.stateStoreNameToSourceTopics(); final List topics = stateStoreAndTopics.get(storeBuilder.name()); - assertTrue("Expected to contain two topics", topics.size() == 2); + assertEquals("Expected to contain two topics", 2, topics.size()); assertTrue(topics.contains("topic-2")); assertTrue(topics.contains("topic-3")); @@ -783,6 +776,21 @@ public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { new MockProcessorSupplier()); } + @Test(expected = NullPointerException.class) + public void shouldThrowIfNameIsNull() { + new InternalTopologyBuilder.Source(null, Collections.emptySet(), null); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfTopicAndPatternAreNull() { + new InternalTopologyBuilder.Source("name", null, null); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowIfBothTopicAndPatternAreNotNull() { + new InternalTopologyBuilder.Source("name", Collections.emptySet(), Pattern.compile("")); + } + @Test public void shouldCompareSourceNodeWithTopicList() { final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); @@ -803,7 +811,7 @@ public void shouldCompareSourceNodeWithTopicPattern() { final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", null, Pattern.compile("topic")); final InternalTopologyBuilder.Source differentPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic2")); - final InternalTopologyBuilder.Source overlappingPattern = new InternalTopologyBuilder.Source("name", Collections.singleton("top*"), null); + final InternalTopologyBuilder.Source overlappingPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("top*")); assertThat(base, equalTo(sameAsBase)); assertThat(base, not(equalTo(differentName))); From ff0e74e6ece491b652cff6c895b4c20b51585389 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sun, 21 Apr 2019 17:59:30 -0700 Subject: [PATCH 6/8] Fix failing test --- .../streams/processor/internals/InternalTopologyBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index dbc68fcdec28d..907a1325ef126 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -282,7 +282,7 @@ private boolean isMatch(final String topic) { @Override Source describe() { - return new Source(name, new HashSet<>(topics), pattern); + return new Source(name, topics.size() == 0 ? null : new HashSet<>(topics), pattern); } } From 5f2a96ee553ae1c9c7a01b91258dc7848a8abcb7 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 23 Apr 2019 09:52:34 -0700 Subject: [PATCH 7/8] Github comments --- .../InternalTopologyBuilderTest.java | 51 ++++++++++++++----- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 460c5d19d3d0b..189a044983257 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -55,6 +55,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -781,40 +782,66 @@ public void shouldThrowIfNameIsNull() { new InternalTopologyBuilder.Source(null, Collections.emptySet(), null); } - @Test(expected = IllegalArgumentException.class) + @Test public void shouldThrowIfTopicAndPatternAreNull() { - new InternalTopologyBuilder.Source("name", null, null); + final Exception e = assertThrows(IllegalArgumentException.class, () -> new InternalTopologyBuilder.Source("name", null, null)); + assertEquals("Either topics or pattern must be not-null, but both are null.", e.getMessage()); } - @Test(expected = IllegalArgumentException.class) + @Test public void shouldThrowIfBothTopicAndPatternAreNotNull() { - new InternalTopologyBuilder.Source("name", Collections.emptySet(), Pattern.compile("")); + final Exception e = assertThrows(IllegalArgumentException.class, () -> new InternalTopologyBuilder.Source("name", Collections.emptySet(), Pattern.compile(""))); + assertEquals("Either topics or pattern must be null, but both are not null.", e.getMessage()); } @Test - public void shouldCompareSourceNodeWithTopicList() { + public void sourceShouldBeEqualIfNameAndTopicListAreTheSame() { final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); + + assertThat(base, equalTo(sameAsBase)); + } + + @Test + public void sourceShouldBeEqualIfNameAndPatternAreTheSame() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); + final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); + + assertThat(base, equalTo(sameAsBase)); + } + + @Test + public void sourceShouldNotBeEqualForDifferentNamesWithSameTopicList() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", Collections.singleton("topic"), null); + + assertThat(base, not(equalTo(differentName))); + } + + @Test + public void sourceShouldNotBeEqualForDifferentNamesWithSamePattern() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); + final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", null, Pattern.compile("topic")); + + assertThat(base, not(equalTo(differentName))); + } + + @Test + public void sourceShouldNotBeEqualForDifferentTopicList() { + final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", Collections.singleton("topic"), null); final InternalTopologyBuilder.Source differentTopicList = new InternalTopologyBuilder.Source("name", Collections.emptySet(), null); final InternalTopologyBuilder.Source differentTopic = new InternalTopologyBuilder.Source("name", Collections.singleton("topic2"), null); - assertThat(base, equalTo(sameAsBase)); - assertThat(base, not(equalTo(differentName))); assertThat(base, not(equalTo(differentTopicList))); assertThat(base, not(equalTo(differentTopic))); } @Test - public void shouldCompareSourceNodeWithTopicPattern() { + public void sourceShouldNotBeEqualForDifferentPattern() { final InternalTopologyBuilder.Source base = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); - final InternalTopologyBuilder.Source sameAsBase = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic")); - final InternalTopologyBuilder.Source differentName = new InternalTopologyBuilder.Source("name2", null, Pattern.compile("topic")); final InternalTopologyBuilder.Source differentPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("topic2")); final InternalTopologyBuilder.Source overlappingPattern = new InternalTopologyBuilder.Source("name", null, Pattern.compile("top*")); - assertThat(base, equalTo(sameAsBase)); - assertThat(base, not(equalTo(differentName))); assertThat(base, not(equalTo(differentPattern))); assertThat(base, not(equalTo(overlappingPattern))); } From ff1c57004a83c1e8e9a379d556afe94e04ca3c50 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 27 Apr 2019 14:35:01 -0700 Subject: [PATCH 8/8] Github comments --- .../streams/processor/internals/InternalTopologyBuilder.java | 2 +- .../processor/internals/InternalTopologyBuilderTest.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 907a1325ef126..2d527e51f6b65 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1402,7 +1402,7 @@ public abstract static class AbstractNode implements TopologyDescription.Node { int size; AbstractNode(final String name) { - Objects.requireNonNull(name); + Objects.requireNonNull(name, "name cannot be null"); this.name = name; this.size = 1; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 189a044983257..b86211fbcd238 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -777,9 +777,10 @@ public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { new MockProcessorSupplier()); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowIfNameIsNull() { - new InternalTopologyBuilder.Source(null, Collections.emptySet(), null); + final Exception e = assertThrows(NullPointerException.class, () -> new InternalTopologyBuilder.Source(null, Collections.emptySet(), null)); + assertEquals("name cannot be null", e.getMessage()); } @Test