From 166950971b19d3f11de0ef92d57ecdeb55c7c107 Mon Sep 17 00:00:00 2001 From: Albert Lowis Date: Sat, 1 Aug 2020 10:28:59 +0800 Subject: [PATCH 1/4] KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from StreamTableJoinIntegrationTest into its own test The main goal is to remove usage of embedded broker (EmbeddedKafkaCluster) in AbstractJoinIntegrationTest and its subclasses. This is because the tests under this class are no longer using the embedded broker, except for two. testShouldAutoShutdownOnIncompleteMetadata is one of such tests. Furthermore, this test does not actually perfom stream-table join; it is testing an edge case of joining with a non-existent topic, so it should be in a separate test. --- ...WithIncompleteMetadataIntegrationTest.java | 109 ++++++++++++++++++ .../StreamTableJoinIntegrationTest.java | 30 ----- 2 files changed, 109 insertions(+), 30 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java new file mode 100644 index 0000000000000..177fa4370aa4b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreamsWrapper; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertTrue; + +@Category({IntegrationTest.class}) +public class JoinWithIncompleteMetadataIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); + + private static final String APP_ID = "join-incomplete-metadata-integration-test"; + private static final Long COMMIT_INTERVAL = 100L; + static final Properties STREAMS_CONFIG = new Properties(); + static final String INPUT_TOPIC_RIGHT = "inputTopicRight"; + static final String NON_EXISTENT_INPUT_TOPIC_LEFT = "inputTopicLeft-not-exist"; + static final String OUTPUT_TOPIC = "outputTopic"; + + StreamsBuilder builder; + final ValueJoiner valueJoiner = (value1, value2) -> value1 + "-" + value2; + private KTable rightTable; + + @BeforeClass + public static void setupConfigsAndUtils() { + STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); + } + + @Before + public void prepareTopology() throws InterruptedException { + CLUSTER.createTopics(INPUT_TOPIC_RIGHT, OUTPUT_TOPIC); + STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); + + builder = new StreamsBuilder(); + rightTable = builder.table(INPUT_TOPIC_RIGHT); + } + + @After + public void cleanup() throws InterruptedException { + CLUSTER.deleteAllTopicsAndWait(120000); + } + + @Test + public void testShouldAutoShutdownOnJoinWithIncompleteMetadata() throws InterruptedException { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + + final KStream notExistStream = builder.stream(NON_EXISTENT_INPUT_TOPIC_LEFT); + + final KTable aggregatedTable = notExistStream.leftJoin(rightTable, valueJoiner) + .groupBy((key, value) -> key) + .reduce((value1, value2) -> value1 + value2); + + // Write the (continuously updating) results to the output topic. + aggregatedTable.toStream().to(OUTPUT_TOPIC); + + final KafkaStreamsWrapper streams = new KafkaStreamsWrapper(builder.build(), STREAMS_CONFIG); + final IntegrationTestUtils.StateListenerStub listener = new IntegrationTestUtils.StateListenerStub(); + streams.setStreamThreadStateListener(listener); + streams.start(); + + TestUtils.waitForCondition(listener::transitToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN"); + + streams.close(); + assertTrue(listener.transitToPendingShutdownSeen()); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java index 66f0a04b91b55..0c6fb3bb1a7f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java @@ -16,15 +16,12 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.streams.KafkaStreamsWrapper; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -35,8 +32,6 @@ import java.util.Collections; import java.util.List; -import static org.junit.Assert.assertTrue; - /** * Tests all available joins of Kafka Streams DSL. */ @@ -61,31 +56,6 @@ public void prepareTopology() throws InterruptedException { leftStream = builder.stream(INPUT_TOPIC_LEFT); } - @Test - public void testShouldAutoShutdownOnIncompleteMetadata() throws InterruptedException { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-incomplete"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - - final KStream notExistStream = builder.stream(INPUT_TOPIC_LEFT + "-not-existed"); - - final KTable aggregatedTable = notExistStream.leftJoin(rightTable, valueJoiner) - .groupBy((key, value) -> key) - .reduce((value1, value2) -> value1 + value2); - - // Write the (continuously updating) results to the output topic. - aggregatedTable.toStream().to(OUTPUT_TOPIC); - - final KafkaStreamsWrapper streams = new KafkaStreamsWrapper(builder.build(), STREAMS_CONFIG); - final IntegrationTestUtils.StateListenerStub listener = new IntegrationTestUtils.StateListenerStub(); - streams.setStreamThreadStateListener(listener); - streams.start(); - - TestUtils.waitForCondition(listener::transitToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN"); - - streams.close(); - assertTrue(listener.transitToPendingShutdownSeen()); - } - @Test public void testInner() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); From 08528e90fc3538c40cff6edd3856fb595b7a6b14 Mon Sep 17 00:00:00 2001 From: Albert Lowis Date: Sat, 1 Aug 2020 14:26:45 +0800 Subject: [PATCH 2/4] KAFKA-9273: Extract shouldNotAccessJoinStoresWhenGivingName from StreamStreamJoinIntegrationTest into its own test. shouldNotAccessJoinStoresWhenGivingName is the last unit test that has a dependency to embedded broker in AbstractJoinIntegrationTest. This test does not actually test the behaviour of stream-stream join; It is testing an edge case of accessing the store of a join, so it should be in a separate test. --- .../integration/JoinStoreIntegrationTest.java | 114 ++++++++++++++++++ .../StreamStreamJoinIntegrationTest.java | 38 ------ 2 files changed, 114 insertions(+), 38 deletions(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java new file mode 100644 index 0000000000000..c519117fabe20 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import static java.time.Duration.ofMillis; +import static org.junit.Assert.assertThrows; + +@Category({IntegrationTest.class}) +public class JoinStoreIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); + + private static final String APP_ID = "join-store-integration-test"; + private static final Long COMMIT_INTERVAL = 100L; + static final Properties STREAMS_CONFIG = new Properties(); + static final String INPUT_TOPIC_RIGHT = "inputTopicRight"; + static final String INPUT_TOPIC_LEFT = "inputTopicLeft"; + static final String OUTPUT_TOPIC = "outputTopic"; + + StreamsBuilder builder; + + @BeforeClass + public static void setupConfigsAndUtils() { + STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); + } + + @Before + public void prepareTopology() throws InterruptedException { + CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC); + STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); + + builder = new StreamsBuilder(); + } + + @After + public void cleanup() throws InterruptedException { + CLUSTER.deleteAllTopicsAndWait(120000); + } + + @Test + public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-no-store-access"); + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream left = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.String(), Serdes.Integer())); + final KStream right = builder.stream(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.String(), Serdes.Integer())); + final CountDownLatch latch = new CountDownLatch(1); + + left.join( + right, + (value1, value2) -> value1 + value2, + JoinWindows.of(ofMillis(100)), + StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("join-store")); + + try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG)) { + kafkaStreams.setStateListener((newState, oldState) -> { + if (newState == KafkaStreams.State.RUNNING) { + latch.countDown(); + } + }); + + kafkaStreams.start(); + latch.await(); + assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore()))); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index c503a98c3b1f5..67db9bde266a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -16,17 +16,10 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.InvalidStateStoreException; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.StreamJoined; -import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; @@ -40,11 +33,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import static java.time.Duration.ofMillis; import static java.time.Duration.ofSeconds; -import static org.junit.Assert.assertThrows; /** * Tests all available joins of Kafka Streams DSL. @@ -70,34 +60,6 @@ public void prepareTopology() throws InterruptedException { rightStream = builder.stream(INPUT_TOPIC_RIGHT); } - @Test - public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-no-store-access"); - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream left = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.String(), Serdes.Integer())); - final KStream right = builder.stream(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.String(), Serdes.Integer())); - final CountDownLatch latch = new CountDownLatch(1); - - left.join( - right, - (value1, value2) -> value1 + value2, - JoinWindows.of(ofMillis(100)), - StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("join-store")); - - try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG)) { - kafkaStreams.setStateListener((newState, oldState) -> { - if (newState == KafkaStreams.State.RUNNING) { - latch.countDown(); - } - }); - - kafkaStreams.start(); - latch.await(); - assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore()))); - } - } - @Test public void testInner() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); From 865379e0182b685ba128cbd5d3491e9536bdfd38 Mon Sep 17 00:00:00 2001 From: Albert Lowis Date: Sat, 1 Aug 2020 14:32:20 +0800 Subject: [PATCH 3/4] KAFKA-9273: Remove dependency on embedded broker from AbstractJoinIntegrationTest. The tests under AbstractJoinIntegrationTest previously requires embedded broker, but it is better to use TopologyTestDriver instead. Existing stream-stream join tests that still make use of embedded broker is moved to use TopologyTestDriver, by specifying it under BOOTSTRAP_SERVERS_CONFIG. And then the old EmbeddedKafkaCluster in AbstractJoinIntegrationTest is safely removed. --- .../integration/AbstractJoinIntegrationTest.java | 13 ------------- .../StreamStreamJoinIntegrationTest.java | 7 +++++++ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index 639dda26ee904..f737f9e021c8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -37,9 +36,7 @@ import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; -import org.junit.After; import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Rule; import org.junit.experimental.categories.Category; import org.junit.rules.TemporaryFolder; @@ -66,9 +63,6 @@ @Category({IntegrationTest.class}) @RunWith(value = Parameterized.class) public abstract class AbstractJoinIntegrationTest { - @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - @Rule public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); @@ -123,25 +117,18 @@ public static Collection data() { public static void setupConfigsAndUtils() { STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); } void prepareEnvironment() throws InterruptedException { - CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC); - if (!cacheEnabled) { STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); } STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); } - @After - public void cleanup() throws InterruptedException { - CLUSTER.deleteAllTopicsAndWait(120000); - } void runTestWithDriver(final List>> expectedResult) { runTestWithDriver(expectedResult, null); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index 67db9bde266a1..944592445ddcf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -63,6 +63,7 @@ public void prepareTopology() throws InterruptedException { @Test public void testInner() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -104,6 +105,7 @@ public void testInner() { @Test public void testInnerRepartitioned() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-repartitioned"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -148,6 +150,7 @@ public void testInnerRepartitioned() { @Test public void testLeft() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -189,6 +192,7 @@ public void testLeft() { @Test public void testLeftRepartitioned() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left-repartitioned"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -233,6 +237,7 @@ public void testLeftRepartitioned() { @Test public void testOuter() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -274,6 +279,7 @@ public void testOuter() { @Test public void testOuterRepartitioned() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -318,6 +324,7 @@ public void testOuterRepartitioned() { @Test public void testMultiInner() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-multi-inner"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, From af05f07cda3b0cc232e2ed89d5f3d0f91fcf416a Mon Sep 17 00:00:00 2001 From: Albert Lowis Date: Thu, 13 Aug 2020 13:34:26 +0800 Subject: [PATCH 4/4] Centralize setting of BOOTSTRAP_SERVERS_CONFIG inside AbstractJoinIntegrationTest. Previously the setting of BOOTSTRAP_SERVERS_CONFIG is added on each test method, but this can actually be extracted to the BeforeClass method which is setupConfigsAndUtils in AbstractJoinIntegrationTest. --- .../streams/integration/AbstractJoinIntegrationTest.java | 1 + .../integration/StreamStreamJoinIntegrationTest.java | 7 ------- .../integration/StreamTableJoinIntegrationTest.java | 2 -- 3 files changed, 1 insertion(+), 9 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index f737f9e021c8e..d7e19c7e9e89a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -116,6 +116,7 @@ public static Collection data() { @BeforeClass public static void setupConfigsAndUtils() { + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index 944592445ddcf..67db9bde266a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -63,7 +63,6 @@ public void prepareTopology() throws InterruptedException { @Test public void testInner() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -105,7 +104,6 @@ public void testInner() { @Test public void testInnerRepartitioned() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-repartitioned"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -150,7 +148,6 @@ public void testInnerRepartitioned() { @Test public void testLeft() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -192,7 +189,6 @@ public void testLeft() { @Test public void testLeftRepartitioned() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left-repartitioned"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -237,7 +233,6 @@ public void testLeftRepartitioned() { @Test public void testOuter() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -279,7 +274,6 @@ public void testOuter() { @Test public void testOuterRepartitioned() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -324,7 +318,6 @@ public void testOuterRepartitioned() { @Test public void testMultiInner() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-multi-inner"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java index 0c6fb3bb1a7f0..0f7e8aa95f012 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java @@ -59,7 +59,6 @@ public void prepareTopology() throws InterruptedException { @Test public void testInner() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null, @@ -86,7 +85,6 @@ public void testInner() { @Test public void testLeft() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left"); - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000"); final List>> expectedResult = Arrays.asList( null,