diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 9fff8ccca0456..f9ab3f1244959 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -91,14 +91,14 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final Intern try { context.setRecordContext(entry.recordContext()); if (flushListener != null) { - final V oldValue = sendOldValues ? serdes.valueFrom(underlying.get(entry.key())) : null; + underlying.put(entry.key(), entry.newValue()); flushListener.apply(serdes.keyFrom(entry.key().get()), serdes.valueFrom(entry.newValue()), oldValue); - + } else { + underlying.put(entry.key(), entry.newValue()); } - underlying.put(entry.key(), entry.newValue()); } finally { context.setRecordContext(current); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java new file mode 100644 index 0000000000000..16d2611a8a37d --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** + * Tests all available joins of Kafka Streams DSL. + */ +@Category({IntegrationTest.class}) +@RunWith(value = Parameterized.class) +public abstract class AbstractJoinIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); + + @Parameterized.Parameters(name = "caching enabled = {0}") + public static Collection data() { + List values = new ArrayList<>(); + for (boolean cacheEnabled : Arrays.asList(true, false)) + values.add(new Object[] {cacheEnabled}); + return values; + } + + static String appID; + + private static final Long COMMIT_INTERVAL = 100L; + static final Properties STREAMS_CONFIG = new Properties(); + static final String INPUT_TOPIC_RIGHT = "inputTopicRight"; + static final String INPUT_TOPIC_LEFT = "inputTopicLeft"; + static final String OUTPUT_TOPIC = "outputTopic"; + private final long anyUniqueKey = 0L; + + private final static Properties PRODUCER_CONFIG = new Properties(); + private final static Properties RESULT_CONSUMER_CONFIG = new Properties(); + + private KafkaProducer producer; + private KafkaStreams streams; + + StreamsBuilder builder; + int numRecordsExpected = 0; + AtomicBoolean finalResultReached = new AtomicBoolean(false); + + private final List> input = Arrays.asList( + new Input<>(INPUT_TOPIC_LEFT, (String) null), + new Input<>(INPUT_TOPIC_RIGHT, (String) null), + new Input<>(INPUT_TOPIC_LEFT, "A"), + new Input<>(INPUT_TOPIC_RIGHT, "a"), + new Input<>(INPUT_TOPIC_LEFT, "B"), + new Input<>(INPUT_TOPIC_RIGHT, "b"), + new Input<>(INPUT_TOPIC_LEFT, (String) null), + new Input<>(INPUT_TOPIC_RIGHT, (String) null), + new Input<>(INPUT_TOPIC_LEFT, "C"), + new Input<>(INPUT_TOPIC_RIGHT, "c"), + new Input<>(INPUT_TOPIC_RIGHT, (String) null), + new Input<>(INPUT_TOPIC_LEFT, (String) null), + new Input<>(INPUT_TOPIC_RIGHT, (String) null), + new Input<>(INPUT_TOPIC_RIGHT, "d"), + new Input<>(INPUT_TOPIC_LEFT, "D") + ); + + final ValueJoiner valueJoiner = new ValueJoiner() { + @Override + public String apply(final String value1, final String value2) { + return value1 + "-" + value2; + } + }; + + final boolean cacheEnabled; + + AbstractJoinIntegrationTest(boolean cacheEnabled) { + this.cacheEnabled = cacheEnabled; + } + + @BeforeClass + public static void setupConfigsAndUtils() { + PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); + PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); + PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer"); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); + } + + void prepareEnvironment() throws InterruptedException { + CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC); + + if (!cacheEnabled) + STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + + STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); + + producer = new KafkaProducer<>(PRODUCER_CONFIG); + } + + @After + public void cleanup() throws InterruptedException { + CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC); + } + + private void checkResult(final String outputTopic, final List expectedResult) throws InterruptedException { + final List result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L); + assertThat(result, is(expectedResult)); + } + + private void checkResult(final String outputTopic, final String expectedFinalResult, final int expectedTotalNumRecords) throws InterruptedException { + final List result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30 * 1000L); + assertThat(result.get(result.size() - 1), is(expectedFinalResult)); + } + + /* + * Runs the actual test. Checks the result after each input record to ensure fixed processing order. + * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry + */ + void runTest(final List> expectedResult) throws Exception { + runTest(expectedResult, null); + } + + + /* + * Runs the actual test. Checks the result after each input record to ensure fixed processing order. + * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry + */ + void runTest(final List> expectedResult, final String storeName) throws Exception { + assert expectedResult.size() == input.size(); + + IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); + streams = new KafkaStreams(builder.build(), new StreamsConfig(STREAMS_CONFIG)); + + String expectedFinalResult = null; + + try { + streams.start(); + + long ts = System.currentTimeMillis(); + + final Iterator> resultIterator = expectedResult.iterator(); + for (final Input singleInput : input) { + producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get(); + + List expected = resultIterator.next(); + + if (expected != null) { + checkResult(OUTPUT_TOPIC, expected); + expectedFinalResult = expected.get(expected.size() - 1); + } + } + + if (storeName != null) { + checkQueryableStore(storeName, expectedFinalResult); + } + } finally { + streams.close(); + } + } + + /* + * Runs the actual test. Checks the final result only after expected number of records have been consumed. + */ + void runTest(final String expectedFinalResult) throws Exception { + runTest(expectedFinalResult, null); + } + + /* + * Runs the actual test. Checks the final result only after expected number of records have been consumed. + */ + void runTest(final String expectedFinalResult, final String storeName) throws Exception { + IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); + streams = new KafkaStreams(builder.build(), new StreamsConfig(STREAMS_CONFIG)); + + try { + streams.start(); + + long ts = System.currentTimeMillis(); + + for (final Input singleInput : input) { + producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get(); + } + + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return finalResultReached.get(); + } + }, "Never received expected final result."); + + checkResult(OUTPUT_TOPIC, expectedFinalResult, numRecordsExpected); + + if (storeName != null) { + checkQueryableStore(storeName, expectedFinalResult); + } + } finally { + streams.close(); + } + } + + /* + * Checks the embedded queryable state store snapshot + */ + private void checkQueryableStore(final String queryableName, final String expectedFinalResult) { + final ReadOnlyKeyValueStore store = streams.store(queryableName, QueryableStoreTypes.keyValueStore()); + + final KeyValueIterator all = store.all(); + final KeyValue onlyEntry = all.next(); + + try { + assertThat(onlyEntry.key, is(anyUniqueKey)); + assertThat(onlyEntry.value, is(expectedFinalResult)); + assertThat(all.hasNext(), is(false)); + } finally { + all.close(); + } + } + + private final class Input { + String topic; + KeyValue record; + + Input(final String topic, final V value) { + this.topic = topic; + record = KeyValue.pair(anyUniqueKey, value); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java deleted file mode 100644 index faa581b175c17..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.JoinWindows; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - -/** - * Tests all available joins of Kafka Streams DSL. - */ -@Category({IntegrationTest.class}) -public class JoinIntegrationTest { - @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - - private static final String APP_ID = "join-integration-test"; - private static final String INPUT_TOPIC_1 = "inputTopicLeft"; - private static final String INPUT_TOPIC_2 = "inputTopicRight"; - private static final String OUTPUT_TOPIC = "outputTopic"; - - private final static Properties PRODUCER_CONFIG = new Properties(); - private final static Properties RESULT_CONSUMER_CONFIG = new Properties(); - private final static Properties STREAMS_CONFIG = new Properties(); - - private StreamsBuilder builder; - private KStream leftStream; - private KStream rightStream; - private KTable leftTable; - private KTable rightTable; - - private final List> input = Arrays.asList( - new Input<>(INPUT_TOPIC_1, (String) null), - new Input<>(INPUT_TOPIC_2, (String) null), - new Input<>(INPUT_TOPIC_1, "A"), - new Input<>(INPUT_TOPIC_2, "a"), - new Input<>(INPUT_TOPIC_1, "B"), - new Input<>(INPUT_TOPIC_2, "b"), - new Input<>(INPUT_TOPIC_1, (String) null), - new Input<>(INPUT_TOPIC_2, (String) null), - new Input<>(INPUT_TOPIC_1, "C"), - new Input<>(INPUT_TOPIC_2, "c"), - new Input<>(INPUT_TOPIC_2, (String) null), - new Input<>(INPUT_TOPIC_1, (String) null), - new Input<>(INPUT_TOPIC_2, (String) null), - new Input<>(INPUT_TOPIC_2, "d"), - new Input<>(INPUT_TOPIC_1, "D") - ); - - private final ValueJoiner valueJoiner = new ValueJoiner() { - @Override - public String apply(final String value1, final String value2) { - return value1 + "-" + value2; - } - }; - - @BeforeClass - public static void setupConfigsAndUtils() { - PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); - PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); - PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer"); - RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - } - - @Before - public void prepareTopology() throws InterruptedException { - CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC); - - builder = new StreamsBuilder(); - leftTable = builder.table(INPUT_TOPIC_1); - rightTable = builder.table(INPUT_TOPIC_2); - leftStream = leftTable.toStream(); - rightStream = rightTable.toStream(); - } - - @After - public void cleanup() throws InterruptedException { - CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC); - } - - private void checkResult(final String outputTopic, final List expectedResult) throws InterruptedException { - if (expectedResult != null) { - final List result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L); - assertThat(result, is(expectedResult)); - } - } - - /* - * Runs the actual test. Checks the result after each input record to ensure fixed processing order. - * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry - */ - private void runTest(final List> expectedResult) throws Exception { - assert expectedResult.size() == input.size(); - - IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); - final KafkaStreams streams = new KafkaStreams(builder.build(), STREAMS_CONFIG); - try { - streams.start(); - - long ts = System.currentTimeMillis(); - - final Iterator> resultIterator = expectedResult.iterator(); - for (final Input singleInput : input) { - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(singleInput.topic, Collections.singleton(singleInput.record), PRODUCER_CONFIG, ++ts); - checkResult(OUTPUT_TOPIC, resultIterator.next()); - } - } finally { - streams.close(); - } - } - - @Test - public void testInnerKStreamKStream() throws Exception { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KStream"); - - final List> expectedResult = Arrays.asList( - null, - null, - null, - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Arrays.asList("A-b", "B-b"), - null, - null, - Arrays.asList("C-a", "C-b"), - Arrays.asList("A-c", "B-c", "C-c"), - null, - null, - null, - Arrays.asList("A-d", "B-d", "C-d"), - Arrays.asList("D-a", "D-b", "D-c", "D-d") - ); - - leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); - - runTest(expectedResult); - } - - @Test - public void testLeftKStreamKStream() throws Exception { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KStream"); - - final List> expectedResult = Arrays.asList( - null, - null, - Collections.singletonList("A-null"), - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Arrays.asList("A-b", "B-b"), - null, - null, - Arrays.asList("C-a", "C-b"), - Arrays.asList("A-c", "B-c", "C-c"), - null, - null, - null, - Arrays.asList("A-d", "B-d", "C-d"), - Arrays.asList("D-a", "D-b", "D-c", "D-d") - ); - - leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); - - runTest(expectedResult); - } - - @Test - public void testOuterKStreamKStream() throws Exception { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KStream-KStream"); - - final List> expectedResult = Arrays.asList( - null, - null, - Collections.singletonList("A-null"), - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Arrays.asList("A-b", "B-b"), - null, - null, - Arrays.asList("C-a", "C-b"), - Arrays.asList("A-c", "B-c", "C-c"), - null, - null, - null, - Arrays.asList("A-d", "B-d", "C-d"), - Arrays.asList("D-a", "D-b", "D-c", "D-d") - ); - - leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); - - runTest(expectedResult); - } - - @Test - public void testInnerKStreamKTable() throws Exception { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KTable"); - - final List> expectedResult = Arrays.asList( - null, - null, - null, - null, - Collections.singletonList("B-a"), - null, - null, - null, - null, - null, - null, - null, - null, - null, - Collections.singletonList("D-d") - ); - - leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC); - - runTest(expectedResult); - } - - @Test - public void testLeftKStreamKTable() throws Exception { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KTable"); - - final List> expectedResult = Arrays.asList( - null, - null, - Collections.singletonList("A-null"), - null, - Collections.singletonList("B-a"), - null, - null, - null, - Collections.singletonList("C-null"), - null, - null, - null, - null, - null, - Collections.singletonList("D-d") - ); - - leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); - - runTest(expectedResult); - } - - @Test - public void testInnerKTableKTable() throws Exception { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KTable-KTable"); - - final List> expectedResult = Arrays.asList( - null, - null, - null, - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Collections.singletonList("B-b"), - Collections.singletonList((String) null), - null, - null, - Collections.singletonList("C-c"), - Collections.singletonList((String) null), - null, - null, - null, - Collections.singletonList("D-d") - ); - - leftTable.join(rightTable, valueJoiner).to(OUTPUT_TOPIC); - - runTest(expectedResult); - } - - @Test - public void testLeftKTableKTable() throws Exception { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KTable-KTable"); - - final List> expectedResult = Arrays.asList( - null, - null, - Collections.singletonList("A-null"), - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Collections.singletonList("B-b"), - Collections.singletonList((String) null), - null, - Collections.singletonList("C-null"), - Collections.singletonList("C-c"), - Collections.singletonList("C-null"), - Collections.singletonList((String) null), - null, - null, - Collections.singletonList("D-d") - ); - - leftTable.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); - - runTest(expectedResult); - } - - @Test - public void testOuterKTableKTable() throws Exception { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KTable-KTable"); - - final List> expectedResult = Arrays.asList( - null, - null, - Collections.singletonList("A-null"), - Collections.singletonList("A-a"), - Collections.singletonList("B-a"), - Collections.singletonList("B-b"), - Collections.singletonList("null-b"), - Collections.singletonList((String) null), - Collections.singletonList("C-null"), - Collections.singletonList("C-c"), - Collections.singletonList("C-null"), - Collections.singletonList((String) null), - null, - Collections.singletonList("null-d"), - Collections.singletonList("D-d") - ); - - leftTable.outerJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); - - runTest(expectedResult); - } - - private final class Input { - String topic; - KeyValue record; - - private final long anyUniqueKey = 0L; - - Input(final String topic, final V value) { - this.topic = topic; - record = KeyValue.pair(anyUniqueKey, value); - } - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java deleted file mode 100644 index 32546de0ef0b6..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ /dev/null @@ -1,386 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import kafka.utils.MockTime; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.Consumed; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.JoinWindows; -import org.apache.kafka.streams.kstream.Joined; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.MockMapper; -import org.apache.kafka.test.MockValueJoiner; -import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - -@Category({IntegrationTest.class}) -public class KStreamRepartitionJoinTest { - - private static final int NUM_BROKERS = 1; - private static final long COMMIT_INTERVAL_MS = 300L; - - @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - public static final ValueJoiner TOSTRING_JOINER = MockValueJoiner.instance(":"); - private final MockTime mockTime = CLUSTER.time; - private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS); - - private StreamsBuilder builder; - private Properties streamsConfiguration; - private KStream streamOne; - private KStream streamTwo; - private KStream streamFour; - private KeyValueMapper> keyMapper; - - private final List - expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E"); - private KafkaStreams kafkaStreams; - private String streamOneInput; - private String streamTwoInput; - private String streamFourInput; - private static volatile int testNo = 0; - - @Before - public void before() throws InterruptedException { - testNo++; - String applicationId = "kstream-repartition-join-test-" + testNo; - builder = new StreamsBuilder(); - createTopics(); - streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - - streamOne = builder.stream(streamOneInput, Consumed.with(Serdes.Long(), Serdes.Integer())); - streamTwo = builder.stream(streamTwoInput, Consumed.with(Serdes.Integer(), Serdes.String())); - streamFour = builder.stream(streamFourInput, Consumed.with(Serdes.Integer(), Serdes.String())); - - keyMapper = MockMapper.selectValueKeyValueMapper(); - } - - @After - public void whenShuttingDown() throws IOException { - if (kafkaStreams != null) { - kafkaStreams.close(); - } - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); - } - - @Test - public void shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache() throws Exception { - verifyRepartitionOnJoinOperations(0); - } - - @Test - public void shouldCorrectlyRepartitionOnJoinOperationsWithNonZeroSizedCache() throws Exception { - verifyRepartitionOnJoinOperations(10 * 1024 * 1024); - } - - private void verifyRepartitionOnJoinOperations(final int cacheSizeBytes) throws Exception { - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes); - produceMessages(); - final ExpectedOutputOnTopic mapOne = mapStreamOneAndJoin(); - final ExpectedOutputOnTopic mapBoth = mapBothStreamsAndJoin(); - final ExpectedOutputOnTopic mapMapJoin = mapMapJoin(); - final ExpectedOutputOnTopic selectKeyJoin = selectKeyAndJoin(); - final ExpectedOutputOnTopic flatMapJoin = flatMapJoin(); - final ExpectedOutputOnTopic mapRhs = joinMappedRhsStream(); - final ExpectedOutputOnTopic mapJoinJoin = joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined(); - final ExpectedOutputOnTopic leftJoin = mapBothStreamsAndLeftJoin(); - - startStreams(); - - verifyCorrectOutput(mapOne); - verifyCorrectOutput(mapBoth); - verifyCorrectOutput(mapMapJoin); - verifyCorrectOutput(selectKeyJoin); - verifyCorrectOutput(flatMapJoin); - verifyCorrectOutput(mapRhs); - verifyCorrectOutput(mapJoinJoin); - verifyCorrectOutput(leftJoin); - } - - private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException { - String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo; - doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput); - return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput); - } - - private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws InterruptedException { - final KStream map1 = streamOne.map(keyMapper); - final KStream map2 = streamTwo.map(MockMapper.noOpKeyValueMapper()); - - doJoin(map1, map2, "map-both-streams-and-join-" + testNo); - return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join-" + testNo); - } - - private ExpectedOutputOnTopic mapMapJoin() throws InterruptedException { - final KStream mapMapStream = streamOne.map( - new KeyValueMapper>() { - @Override - public KeyValue apply(final Long key, final Integer value) { - if (value == null) { - return new KeyValue<>(null, null); - } - return new KeyValue<>(key + value, value); - } - }).map(keyMapper); - - final String outputTopic = "map-map-join-" + testNo; - doJoin(mapMapStream, streamTwo, outputTopic); - return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); - } - - private ExpectedOutputOnTopic selectKeyAndJoin() throws Exception { - - final KStream keySelected = - streamOne.selectKey(MockMapper.selectValueMapper()); - - final String outputTopic = "select-key-join-" + testNo; - doJoin(keySelected, streamTwo, outputTopic); - return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); - } - - private ExpectedOutputOnTopic flatMapJoin() throws InterruptedException { - final KStream flatMapped = streamOne.flatMap( - new KeyValueMapper>>() { - @Override - public Iterable> apply(final Long key, final Integer value) { - return Collections.singletonList(new KeyValue<>(value, value)); - } - }); - - final String outputTopic = "flat-map-join-" + testNo; - doJoin(flatMapped, streamTwo, outputTopic); - - return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); - } - - private ExpectedOutputOnTopic joinMappedRhsStream() throws InterruptedException { - - final String output = "join-rhs-stream-mapped-" + testNo; - CLUSTER.createTopic(output); - streamTwo - .join(streamOne.map(keyMapper), - TOSTRING_JOINER, - getJoinWindow(), - Joined.with(Serdes.Integer(), Serdes.String(), Serdes.Integer())) - .to(Serdes.Integer(), Serdes.String(), output); - - return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output); - } - - private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws InterruptedException { - final KStream map1 = streamOne.map(keyMapper); - - final KStream map2 = streamTwo.map(MockMapper.noOpKeyValueMapper()); - - - final String outputTopic = "left-join-" + testNo; - CLUSTER.createTopic(outputTopic); - map1.leftJoin(map2, - TOSTRING_JOINER, - getJoinWindow(), - Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String())) - .filterNot(new Predicate() { - @Override - public boolean test(Integer key, String value) { - // filter not left-only join results - return value.substring(2).equals("null"); - } - }) - .to(Serdes.Integer(), Serdes.String(), outputTopic); - - return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); - } - - private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws InterruptedException { - final KStream map1 = streamOne.map(keyMapper); - - final KeyValueMapper> - kvMapper = MockMapper.noOpKeyValueMapper(); - - final KStream map2 = streamTwo.map(kvMapper); - - final KStream join = map1.join(map2, - TOSTRING_JOINER, - getJoinWindow(), - Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String())); - - final String topic = "map-join-join-" + testNo; - CLUSTER.createTopic(topic); - join.map(kvMapper) - .join(streamFour.map(kvMapper), - TOSTRING_JOINER, - getJoinWindow(), - Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())) - .to(Serdes.Integer(), Serdes.String(), topic); - - - return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), topic); - } - - private JoinWindows getJoinWindow() { - return JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE); - } - - - private class ExpectedOutputOnTopic { - private final List expectedOutput; - private final String outputTopic; - - ExpectedOutputOnTopic(final List expectedOutput, final String outputTopic) { - this.expectedOutput = expectedOutput; - this.outputTopic = outputTopic; - } - } - - private void verifyCorrectOutput(final ExpectedOutputOnTopic expectedOutputOnTopic) - throws InterruptedException { - assertThat(receiveMessages(new StringDeserializer(), - expectedOutputOnTopic.expectedOutput.size(), - expectedOutputOnTopic.outputTopic), - is(expectedOutputOnTopic.expectedOutput)); - } - - private void produceMessages() throws Exception { - produceToStreamOne(); - produceStreamTwoInputTo(streamTwoInput); - produceStreamTwoInputTo(streamFourInput); - - } - - private void produceStreamTwoInputTo(final String streamTwoInput) throws Exception { - IntegrationTestUtils.produceKeyValuesSynchronously( - streamTwoInput, - Arrays.asList( - new KeyValue<>(1, "A"), - new KeyValue<>(2, "B"), - new KeyValue<>(3, "C"), - new KeyValue<>(4, "D"), - new KeyValue<>(5, "E")), - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - IntegerSerializer.class, - StringSerializer.class, - new Properties()), - mockTime); - } - - private void produceToStreamOne() throws Exception { - IntegrationTestUtils.produceKeyValuesSynchronously( - streamOneInput, - Arrays.asList( - new KeyValue<>(10L, 1), - new KeyValue<>(5L, 2), - new KeyValue<>(12L, 3), - new KeyValue<>(15L, 4), - new KeyValue<>(20L, 5), - new KeyValue(70L, null)), // nulls should be filtered - TestUtils.producerConfig( - CLUSTER.bootstrapServers(), - LongSerializer.class, - IntegerSerializer.class, - new Properties()), - mockTime); - } - - private void createTopics() throws InterruptedException { - streamOneInput = "stream-one-" + testNo; - streamTwoInput = "stream-two-" + testNo; - streamFourInput = "stream-four-" + testNo; - CLUSTER.createTopic(streamOneInput, 2, 1); - CLUSTER.createTopic(streamTwoInput, 2, 1); - CLUSTER.createTopic(streamFourInput, 2, 1); - } - - - private void startStreams() { - kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); - kafkaStreams.start(); - } - - - private List receiveMessages(final Deserializer valueDeserializer, - final int numMessages, final String topic) throws InterruptedException { - - final Properties config = new Properties(); - - config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test"); - config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - IntegerDeserializer.class.getName()); - config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - valueDeserializer.getClass().getName()); - final List received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived( - config, - topic, - numMessages, - 60 * 1000); - Collections.sort(received); - - return received; - } - - private void doJoin(final KStream lhs, - final KStream rhs, - final String outputTopic) throws InterruptedException { - CLUSTER.createTopic(outputTopic); - lhs.join(rhs, - TOSTRING_JOINER, - getJoinWindow(), - Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String())) - .to(Serdes.Integer(), Serdes.String(), outputTopic); - } - -} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java deleted file mode 100644 index a12ffac58ab57..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import kafka.utils.MockTime; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -@Category({IntegrationTest.class}) -public class KTableKTableJoinIntegrationTest { - private final static int NUM_BROKERS = 1; - - @ClassRule - public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - private final static MockTime MOCK_TIME = CLUSTER.time; - private final static String TABLE_1 = "table1"; - private final static String TABLE_2 = "table2"; - private final static String TABLE_3 = "table3"; - private final static String OUTPUT = "output-"; - private static Properties streamsConfig; - private KafkaStreams streams; - private final static Properties CONSUMER_CONFIG = new Properties(); - - @BeforeClass - public static void beforeTest() throws Exception { - CLUSTER.createTopics(TABLE_1, TABLE_2, TABLE_3, OUTPUT); - - streamsConfig = new Properties(); - streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - final List> table1 = Arrays.asList( - new KeyValue<>("a", "A1"), - new KeyValue<>("b", "B1") - ); - - final List> table2 = Arrays.asList( - new KeyValue<>("b", "B2"), - new KeyValue<>("c", "C2") - ); - - final List> table3 = Arrays.asList( - new KeyValue<>("a", "A3"), - new KeyValue<>("b", "B3"), - new KeyValue<>("c", "C3") - ); - - // put table 3 first, to make sure data is there when joining T1 with T2 - IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, producerConfig, MOCK_TIME); - IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, producerConfig, MOCK_TIME); - IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, producerConfig, MOCK_TIME); - - CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer"); - CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - } - - @Before - public void before() throws IOException { - IntegrationTestUtils.purgeLocalStreamsState(streamsConfig); - } - - @After - public void after() throws IOException { - if (streams != null) { - streams.close(); - streams = null; - } - IntegrationTestUtils.purgeLocalStreamsState(streamsConfig); - } - - private enum JoinType { - INNER, LEFT, OUTER - } - - - @Test - public void shouldInnerInnerJoin() throws InterruptedException { - verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false); - } - - @Test - public void shouldInnerInnerJoinQueryable() throws InterruptedException { - verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true); - } - - @Test - public void shouldInnerLeftJoin() throws InterruptedException { - verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false); - } - - @Test - public void shouldInnerLeftJoinQueryable() throws InterruptedException { - verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true); - } - - @Test - public void shouldInnerOuterJoin() throws InterruptedException { - verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList( - new KeyValue<>("a", "null-A3"), - new KeyValue<>("b", "null-B3"), - new KeyValue<>("c", "null-C3"), - new KeyValue<>("b", "B1-B2-B3")), false); - } - - @Test - public void shouldInnerOuterJoinQueryable() throws InterruptedException { - verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList( - new KeyValue<>("a", "null-A3"), - new KeyValue<>("b", "null-B3"), - new KeyValue<>("c", "null-C3"), - new KeyValue<>("b", "B1-B2-B3")), true); - } - - @Test - public void shouldLeftInnerJoin() throws InterruptedException { - verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList( - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3")), false); - } - - @Test - public void shouldLeftInnerJoinQueryable() throws InterruptedException { - verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList( - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3")), true); - } - - @Test - public void shouldLeftLeftJoin() throws InterruptedException { - verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList( - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3")), false); - } - - @Test - public void shouldLeftLeftJoinQueryable() throws InterruptedException { - verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList( - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3")), true); - } - - @Test - public void shouldLeftOuterJoin() throws InterruptedException { - verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList( - new KeyValue<>("a", "null-A3"), - new KeyValue<>("b", "null-B3"), - new KeyValue<>("c", "null-C3"), - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3")), false); - } - - @Test - public void shouldLeftOuterJoinQueryable() throws InterruptedException { - verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList( - new KeyValue<>("a", "null-A3"), - new KeyValue<>("b", "null-B3"), - new KeyValue<>("c", "null-C3"), - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3")), true); - } - - @Test - public void shouldOuterInnerJoin() throws InterruptedException { - verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList( - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", "null-C2-C3")), false); - } - - @Test - public void shouldOuterInnerJoinQueryable() throws InterruptedException { - verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList( - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", "null-C2-C3")), true); - } - - @Test - public void shouldOuterLeftJoin() throws InterruptedException { - verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT, Arrays.asList( - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", "null-C2-C3")), false); - } - - @Test - public void shouldOuterLeftJoinQueryable() throws InterruptedException { - verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT, Arrays.asList( - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", "null-C2-C3")), true); - } - - @Test - public void shouldOuterOuterJoin() throws InterruptedException { - verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList( - new KeyValue<>("a", "null-A3"), - new KeyValue<>("b", "null-B3"), - new KeyValue<>("c", "null-C3"), - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", "null-C2-C3")), false); - } - - @Test - public void shouldOuterOuterJoinQueryable() throws InterruptedException { - verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList( - new KeyValue<>("a", "null-A3"), - new KeyValue<>("b", "null-B3"), - new KeyValue<>("c", "null-C3"), - new KeyValue<>("a", "A1-null-A3"), - new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", "null-C2-C3")), true); - } - - - private void verifyKTableKTableJoin(final JoinType joinType1, - final JoinType joinType2, - final List> expectedResult, - boolean verifyQueryableState) throws InterruptedException { - final String queryableName = verifyQueryableState ? joinType1 + "-" + joinType2 + "-ktable-ktable-join-query" : null; - streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join" + queryableName); - - streams = prepareTopology(joinType1, joinType2, queryableName); - streams.start(); - - final List> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - CONSUMER_CONFIG, - OUTPUT, - expectedResult.size()); - - assertThat(result, equalTo(expectedResult)); - - if (verifyQueryableState) { - verifyKTableKTableJoinQueryableState(joinType1, joinType2, expectedResult); - } - } - - private void verifyKTableKTableJoinQueryableState(final JoinType joinType1, - final JoinType joinType2, - final List> expectedResult) { - final String queryableName = joinType1 + "-" + joinType2 + "-ktable-ktable-join-query"; - final ReadOnlyKeyValueStore myJoinStore = streams.store(queryableName, - QueryableStoreTypes.keyValueStore()); - - // store only keeps last set of values, not entire stream of value changes - final Map expectedInStore = new HashMap<>(); - for (KeyValue expected : expectedResult) { - expectedInStore.put(expected.key, expected.value); - } - - for (Map.Entry expected : expectedInStore.entrySet()) { - assertEquals(expected.getValue(), myJoinStore.get(expected.getKey())); - } - final KeyValueIterator all = myJoinStore.all(); - while (all.hasNext()) { - KeyValue storeEntry = all.next(); - assertTrue(expectedResult.contains(storeEntry)); - } - all.close(); - - } - - private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2, final String queryableName) { - final StreamsBuilder builder = new StreamsBuilder(); - - final KTable table1 = builder.table(TABLE_1); - final KTable table2 = builder.table(TABLE_2); - final KTable table3 = builder.table(TABLE_3); - - Materialized> materialized = null; - if (queryableName != null) { - materialized = Materialized.>as(queryableName) - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.String()) - .withCachingDisabled(); - } - join(join(table1, table2, joinType1, null /* no need to query intermediate result */), table3, - joinType2, materialized).to(OUTPUT); - - return new KafkaStreams(builder.build(), new StreamsConfig(streamsConfig)); - } - - private KTable join(final KTable first, - final KTable second, - final JoinType joinType, - final Materialized> materialized) { - final ValueJoiner joiner = new ValueJoiner() { - @Override - public String apply(final String value1, final String value2) { - return value1 + "-" + value2; - } - }; - - - switch (joinType) { - case INNER: - if (materialized != null) { - return first.join(second, joiner, materialized); - } else { - return first.join(second, joiner); - } - case LEFT: - if (materialized != null) { - return first.leftJoin(second, joiner, materialized); - } else { - return first.leftJoin(second, joiner); - } - case OUTER: - if (materialized != null) { - return first.outerJoin(second, joiner, materialized); - } else { - return first.outerJoin(second, joiner); - } - } - - throw new RuntimeException("Unknown join type."); - } - -} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java new file mode 100644 index 0000000000000..571dc058d8cad --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.MockMapper; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + + +/** + * Tests all available joins of Kafka Streams DSL. + */ +@Category({IntegrationTest.class}) +@RunWith(value = Parameterized.class) +public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest { + private KStream leftStream; + private KStream rightStream; + + public StreamStreamJoinIntegrationTest(boolean cacheEnabled) { + super(cacheEnabled); + } + + @Before + public void prepareTopology() throws InterruptedException { + super.prepareEnvironment(); + + appID = "stream-stream-join-integration-test"; + + builder = new StreamsBuilder(); + leftStream = builder.stream(INPUT_TOPIC_LEFT); + rightStream = builder.stream(INPUT_TOPIC_RIGHT); + } + + @Test + public void testInner() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); + + final List> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testInnerRepartitioned() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-repartitioned"); + + final List> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.map(MockMapper.noOpKeyValueMapper()) + .join(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) + .selectKey(MockMapper.selectKeyKeyValueMapper()), + valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testLeft() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left"); + + final List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testLeftRepartitioned() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left-repartitioned"); + + final List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.map(MockMapper.noOpKeyValueMapper()) + .leftJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) + .selectKey(MockMapper.selectKeyKeyValueMapper()), + valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testOuter() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer"); + + final List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testOuterRepartitioned() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer"); + + final List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.map(MockMapper.noOpKeyValueMapper()) + .outerJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper()) + .selectKey(MockMapper.selectKeyKeyValueMapper()), + valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testMultiInner() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-multi-inner"); + + final List> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList("A-a-a"), + Collections.singletonList("B-a-a"), + Arrays.asList("A-b-a", "B-b-a", "A-a-b", "B-a-b", "A-b-b", "B-b-b"), + null, + null, + Arrays.asList("C-a-a", "C-a-b", "C-b-a", "C-b-b"), + Arrays.asList("A-c-a", "A-c-b", "B-c-a", "B-c-b", "C-c-a", "C-c-b", "A-a-c", "B-a-c", + "A-b-c", "B-b-c", "C-a-c", "C-b-c", "A-c-c", "B-c-c", "C-c-c"), + null, + null, + null, + Arrays.asList("A-d-a", "A-d-b", "A-d-c", "B-d-a", "B-d-b", "B-d-c", "C-d-a", "C-d-b", "C-d-c", + "A-a-d", "B-a-d", "A-b-d", "B-b-d", "C-a-d", "C-b-d", "A-c-d", "B-c-d", "C-c-d", + "A-d-d", "B-d-d", "C-d-d"), + Arrays.asList("D-a-a", "D-a-b", "D-a-c", "D-a-d", "D-b-a", "D-b-b", "D-b-c", "D-b-d", "D-c-a", + "D-c-b", "D-c-c", "D-c-d", "D-d-a", "D-d-b", "D-d-c", "D-d-d") + ); + + leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000)) + .join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java new file mode 100644 index 0000000000000..f3eceb097356c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.IntegrationTest; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Tests all available joins of Kafka Streams DSL. + */ +@Category({IntegrationTest.class}) +@RunWith(value = Parameterized.class) +public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest { + private KTable leftTable; + private KTable rightTable; + + public TableTableJoinIntegrationTest(boolean cacheEnabled) { + super(cacheEnabled); + } + + @Before + public void prepareTopology() throws InterruptedException { + super.prepareEnvironment(); + + appID = "table-table-join-integration-test"; + + builder = new StreamsBuilder(); + leftTable = builder.table(INPUT_TOPIC_LEFT); + rightTable = builder.table(INPUT_TOPIC_RIGHT); + } + + final private String expectedFinalJoinResult = "D-d"; + final private String expectedFinalMultiJoinResult = "D-d-d"; + final private String storeName = appID + "-store"; + + private Materialized> materialized = Materialized.>as(storeName) + .withKeySerde(Serdes.Long()) + .withValueSerde(Serdes.String()) + .withCachingDisabled() + .withLoggingDisabled(); + + final private class CountingPeek implements ForeachAction { + final private String expected; + + CountingPeek(final boolean multiJoin) { + this.expected = multiJoin ? expectedFinalMultiJoinResult : expectedFinalJoinResult; + } + + @Override + public void apply(final Long key, final String value) { + numRecordsExpected++; + if (value.equals(expected)) { + boolean ret = finalResultReached.compareAndSet(false, true); + + if (!ret) { + // do nothing; it is possible that we will see multiple duplicates of final results due to KAFKA-4309 + // TODO: should be removed when KAFKA-4309 is fixed + } + } + } + } + + @Test + public void testInner() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); + + if (cacheEnabled) { + leftTable.join(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC); + runTest(expectedFinalJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Collections.singletonList("B-b"), + Collections.singletonList((String) null), + null, + null, + Collections.singletonList("C-c"), + Collections.singletonList((String) null), + null, + null, + null, + Collections.singletonList("D-d") + ); + + leftTable.join(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC); + runTest(expectedResult, storeName); + } + } + + @Test + public void testLeft() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left"); + + if (cacheEnabled) { + leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC); + runTest(expectedFinalJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Collections.singletonList("B-b"), + Collections.singletonList((String) null), + null, + Collections.singletonList("C-null"), + Collections.singletonList("C-c"), + Collections.singletonList("C-null"), + Collections.singletonList((String) null), + null, + null, + Collections.singletonList("D-d") + ); + + leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC); + runTest(expectedResult, storeName); + } + } + + @Test + public void testOuter() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer"); + + if (cacheEnabled) { + leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC); + runTest(expectedFinalJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Collections.singletonList("B-b"), + Collections.singletonList("null-b"), + Collections.singletonList((String) null), + Collections.singletonList("C-null"), + Collections.singletonList("C-c"), + Collections.singletonList("C-null"), + Collections.singletonList((String) null), + null, + Collections.singletonList("null-d"), + Collections.singletonList("D-d") + ); + + leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC); + runTest(expectedResult, storeName); + } + } + + @Test + public void testInnerInner() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-inner"); + + if (cacheEnabled) { + leftTable.join(rightTable, valueJoiner) + .join(rightTable, valueJoiner, materialized) + .toStream() + .peek(new CountingPeek(true)) + .to(OUTPUT_TOPIC); + runTest(expectedFinalMultiJoinResult, storeName); + } else { + // FIXME: the duplicate below for all the multi-joins + // are due to KAFKA-6443, should be updated once it is fixed. + List> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList("A-a-a", "A-a-a"), + Collections.singletonList("B-a-a"), + Arrays.asList("B-b-b", "B-b-b"), + Collections.singletonList((String) null), + null, + null, + Arrays.asList("C-c-c", "C-c-c"), + null, + null, + null, + null, + Collections.singletonList("D-d-d") + ); + + leftTable.join(rightTable, valueJoiner) + .join(rightTable, valueJoiner, materialized) + .toStream().to(OUTPUT_TOPIC); + + runTest(expectedResult, storeName); + } + } + + @Test + public void testInnerLeft() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-left"); + + if (cacheEnabled) { + leftTable.join(rightTable, valueJoiner) + .leftJoin(rightTable, valueJoiner, materialized) + .toStream() + .peek(new CountingPeek(true)) + .to(OUTPUT_TOPIC); + runTest(expectedFinalMultiJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList("A-a-a", "A-a-a"), + Collections.singletonList("B-a-a"), + Arrays.asList("B-b-b", "B-b-b"), + Collections.singletonList((String) null), + null, + null, + Arrays.asList("C-c-c", "C-c-c"), + Collections.singletonList((String) null), + null, + null, + null, + Collections.singletonList("D-d-d") + ); + + leftTable.join(rightTable, valueJoiner) + .leftJoin(rightTable, valueJoiner, materialized) + .toStream() + .to(OUTPUT_TOPIC); + + runTest(expectedResult, storeName); + } + } + + @Test + public void testInnerOuter() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-outer"); + + if (cacheEnabled) { + leftTable.join(rightTable, valueJoiner) + .outerJoin(rightTable, valueJoiner, materialized) + .toStream() + .peek(new CountingPeek(true)) + .to(OUTPUT_TOPIC); + runTest(expectedFinalMultiJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList("A-a-a", "A-a-a"), + Collections.singletonList("B-a-a"), + Arrays.asList("B-b-b", "B-b-b"), + Collections.singletonList("null-b"), + Collections.singletonList((String) null), + null, + Arrays.asList("C-c-c", "C-c-c"), + Arrays.asList((String) null, null), + null, + null, + null, + Arrays.asList("null-d", "D-d-d") + ); + + leftTable.join(rightTable, valueJoiner) + .outerJoin(rightTable, valueJoiner, materialized) + .toStream().to(OUTPUT_TOPIC); + + runTest(expectedResult, storeName); + } + } + + @Test + public void testLeftInner() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-inner"); + + if (cacheEnabled) { + leftTable.leftJoin(rightTable, valueJoiner) + .join(rightTable, valueJoiner, materialized) + .toStream() + .peek(new CountingPeek(true)) + .to(OUTPUT_TOPIC); + runTest(expectedFinalMultiJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList("A-a-a", "A-a-a"), + Collections.singletonList("B-a-a"), + Arrays.asList("B-b-b", "B-b-b"), + Collections.singletonList((String) null), + null, + null, + Arrays.asList("C-c-c", "C-c-c"), + Collections.singletonList((String) null), + null, + null, + null, + Collections.singletonList("D-d-d") + ); + + leftTable.leftJoin(rightTable, valueJoiner) + .join(rightTable, valueJoiner, materialized) + .toStream() + .to(OUTPUT_TOPIC); + + runTest(expectedResult, storeName); + } + } + + @Test + public void testLeftLeft() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-left"); + + if (cacheEnabled) { + leftTable.leftJoin(rightTable, valueJoiner) + .leftJoin(rightTable, valueJoiner, materialized) + .toStream() + .peek(new CountingPeek(true)) + .to(OUTPUT_TOPIC); + runTest(expectedFinalMultiJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList("A-null-null", "A-a-a", "A-a-a"), + Collections.singletonList("B-a-a"), + Arrays.asList("B-b-b", "B-b-b"), + Collections.singletonList((String) null), + null, + null, + Arrays.asList("C-null-null", "C-c-c", "C-c-c"), + Arrays.asList("C-null-null", "C-null-null"), + Collections.singletonList((String) null), + null, + null, + Collections.singletonList("D-d-d") + ); + + leftTable.leftJoin(rightTable, valueJoiner) + .leftJoin(rightTable, valueJoiner, materialized) + .toStream() + .to(OUTPUT_TOPIC); + + runTest(expectedResult, storeName); + } + } + + @Test + public void testLeftOuter() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-outer"); + + if (cacheEnabled) { + leftTable.leftJoin(rightTable, valueJoiner) + .outerJoin(rightTable, valueJoiner, materialized) + .toStream() + .peek(new CountingPeek(true)) + .to(OUTPUT_TOPIC); + runTest(expectedFinalMultiJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList("A-null-null", "A-a-a", "A-a-a"), + Collections.singletonList("B-a-a"), + Arrays.asList("B-b-b", "B-b-b"), + Collections.singletonList("null-b"), + Collections.singletonList((String) null), + null, + Arrays.asList("C-null-null", "C-c-c", "C-c-c"), + Arrays.asList("C-null-null", "C-null-null"), + Collections.singletonList((String) null), + null, + null, + Arrays.asList("null-d", "D-d-d") + ); + + leftTable.leftJoin(rightTable, valueJoiner) + .outerJoin(rightTable, valueJoiner, materialized) + .toStream().to(OUTPUT_TOPIC); + + runTest(expectedResult, storeName); + } + } + + @Test + public void testOuterInner() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-inner"); + + if (cacheEnabled) { + leftTable.outerJoin(rightTable, valueJoiner) + .join(rightTable, valueJoiner, materialized) + .toStream() + .peek(new CountingPeek(true)) + .to(OUTPUT_TOPIC); + runTest(expectedFinalMultiJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList("A-a-a", "A-a-a"), + Collections.singletonList("B-a-a"), + Arrays.asList("B-b-b", "B-b-b"), + Collections.singletonList("null-b-b"), + null, + null, + Arrays.asList("C-c-c", "C-c-c"), + Collections.singletonList((String) null), + null, + null, + Arrays.asList("null-d-d", "null-d-d"), + Collections.singletonList("D-d-d") + ); + + leftTable.outerJoin(rightTable, valueJoiner) + .join(rightTable, valueJoiner, materialized) + .toStream() + .to(OUTPUT_TOPIC); + + runTest(expectedResult, storeName); + } + } + + @Test + public void testOuterLeft() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-left"); + + if (cacheEnabled) { + leftTable.outerJoin(rightTable, valueJoiner) + .leftJoin(rightTable, valueJoiner, materialized) + .toStream() + .peek(new CountingPeek(true)) + .to(OUTPUT_TOPIC); + runTest(expectedFinalMultiJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList("A-null-null", "A-a-a", "A-a-a"), + Collections.singletonList("B-a-a"), + Arrays.asList("B-b-b", "B-b-b"), + Collections.singletonList("null-b-b"), + Collections.singletonList((String) null), + null, + Arrays.asList("C-null-null", "C-c-c", "C-c-c"), + Arrays.asList("C-null-null", "C-null-null"), + Collections.singletonList((String) null), + null, + Arrays.asList("null-d-d", "null-d-d"), + Collections.singletonList("D-d-d") + ); + + leftTable.outerJoin(rightTable, valueJoiner) + .leftJoin(rightTable, valueJoiner, materialized) + .toStream() + .to(OUTPUT_TOPIC); + + runTest(expectedResult, storeName); + } + } + + @Test + public void testOuterOuter() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-outer"); + + if (cacheEnabled) { + leftTable.outerJoin(rightTable, valueJoiner) + .outerJoin(rightTable, valueJoiner, materialized) + .toStream() + .peek(new CountingPeek(true)) + .to(OUTPUT_TOPIC); + runTest(expectedFinalMultiJoinResult, storeName); + } else { + List> expectedResult = Arrays.asList( + null, + null, + null, + Arrays.asList("A-null-null", "A-a-a", "A-a-a"), + Collections.singletonList("B-a-a"), + Arrays.asList("B-b-b", "B-b-b"), + Collections.singletonList("null-b-b"), + Arrays.asList((String) null, null), + null, + Arrays.asList("C-null-null", "C-c-c", "C-c-c"), + Arrays.asList("C-null-null", "C-null-null"), + Collections.singletonList((String) null), + null, + null, + Arrays.asList("null-d-d", "null-d-d", "D-d-d") + ); + + leftTable.outerJoin(rightTable, valueJoiner) + .outerJoin(rightTable, valueJoiner, materialized) + .toStream() + .to(OUTPUT_TOPIC); + + runTest(expectedResult, storeName); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/test/MockMapper.java b/streams/src/test/java/org/apache/kafka/test/MockMapper.java index fec9522de82dc..518419984f183 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockMapper.java +++ b/streams/src/test/java/org/apache/kafka/test/MockMapper.java @@ -20,6 +20,8 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueMapper; +import java.util.Collections; + public class MockMapper { private static class NoOpKeyValueMapper implements KeyValueMapper> { @@ -29,6 +31,13 @@ public KeyValue apply(K key, V value) { } } + private static class NoOpFlatKeyValueMapper implements KeyValueMapper>> { + @Override + public Iterable> apply(K key, V value) { + return Collections.singletonList(KeyValue.pair(key, value)); + } + } + private static class SelectValueKeyValueMapper implements KeyValueMapper> { @Override public KeyValue apply(K key, V value) { @@ -61,6 +70,9 @@ public static KeyValueMapper selectKeyKeyValueMapper() { return new SelectKeyMapper<>(); } + public static KeyValueMapper>> noOpFlatKeyValueMapper() { + return new NoOpFlatKeyValueMapper<>(); + } public static KeyValueMapper> noOpKeyValueMapper() { return new NoOpKeyValueMapper<>();