diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index ceebf8d9fa981..782d99a2a6c69 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -27,6 +28,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TopicNameExtractor; +import org.apache.kafka.streams.state.KeyValueStore; /** * {@code KStream} is an abstraction of a record stream of {@link KeyValue} pairs, i.e., each record is an @@ -874,6 +876,73 @@ void to(final String topic, void to(final TopicNameExtractor topicExtractor, final Produced produced); + /** + * Convert this stream to a {@link KTable}. + *

+ * an internal repartitioning topic may need to be created in Kafka if a key changed + * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + *

+ * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of + * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). + * + * @return a {@link KTable} that contains the same records as this {@code KStream} + */ + KTable toTable(); + + /** + * Convert this stream to a {@link KTable}. + *

+ * an internal repartitioning topic may need to be created in Kafka if a key changed + * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + *

+ * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of + * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). + * + * @param named a {@link Named} config used to name the processor in the topology + * @return a {@link KTable} that contains the same records as this {@code KStream} + */ + KTable toTable(final Named named); + + /** + * Convert this stream to a {@link KTable}. + *

+ * an internal repartitioning topic may need to be created in Kafka if a key changed + * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + *

+ * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of + * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). + * + * @param materialized an instance of {@link Materialized} used to describe how the state store of the + * resulting table should be materialized. + * @return a {@link KTable} that contains the same records as this {@code KStream} + */ + KTable toTable(final Materialized> materialized); + + /** + * Convert this stream to a {@link KTable}. + *

+ * an internal repartitioning topic may need to be created in Kafka if a key changed + * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in + * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "<name>" is an internally generated name, and "-repartition" is a fixed suffix. + *

+ * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of + * it was a "fact/event" and is re-interpreted as update now (cf. {@link KStream} vs {@code KTable}). + * + * @param named a {@link Named} config used to name the processor in the topology + * @param materialized an instance of {@link Materialized} used to describe how the state store of the + * resulting table should be materialized. + * @return a {@link KTable} that contains the same records as this {@code KStream} + */ + KTable toTable(final Named named, + final Materialized> materialized); + /** * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper} * and default serializers and deserializers. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index ac631f9e935ef..33f017436f5c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -17,7 +17,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.Grouped; @@ -27,6 +29,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Printed; @@ -45,11 +48,13 @@ import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode; import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode; +import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; +import org.apache.kafka.streams.state.KeyValueStore; import java.lang.reflect.Array; import java.util.Arrays; @@ -58,6 +63,8 @@ import java.util.Objects; import java.util.Set; +import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder; + public class KStreamImpl extends AbstractStream implements KStream { static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-"; @@ -110,6 +117,8 @@ public class KStreamImpl extends AbstractStream implements KStream topicExtractor, builder.addGraphNode(streamsGraphNode, sinkNode); } + @Override + public KTable toTable() { + return toTable(NamedInternal.empty()); + } + + @Override + public KTable toTable(final Named named) { + final ConsumedInternal consumedInternal = new ConsumedInternal<>(Consumed.with(keySerde, valSerde)); + + final MaterializedInternal> materializedInternal = + new MaterializedInternal<>( + Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), + builder, + TO_KTABLE_NAME); + + return doToTable(named, materializedInternal); + } + + @Override + public KTable toTable(final Materialized> materialized) { + return toTable(NamedInternal.empty(), materialized); + } + + @Override + public KTable toTable(final Named named, + final Materialized> materialized) { + Objects.requireNonNull(materialized, "materialized can't be null"); + + final MaterializedInternal> materializedInternal = + new MaterializedInternal<>(materialized); + return doToTable( + named, + materializedInternal); + } + + private KTable doToTable(final Named named, + final MaterializedInternal> materializedInternal) { + Objects.requireNonNull(named, "named can't be null"); + + final Serde keySerdeOverride = materializedInternal.keySerde() == null + ? keySerde + : materializedInternal.keySerde(); + final Serde valueSerdeOverride = materializedInternal.valueSerde() == null + ? valSerde + : materializedInternal.valueSerde(); + + final NamedInternal namedInternal = new NamedInternal(named); + final String name = namedInternal.orElseGenerateWithPrefix(builder, TO_KTABLE_NAME); + final Set subTopologySourceNodes; + final StreamsGraphNode tableParentNode; + + if (repartitionRequired) { + final OptimizableRepartitionNodeBuilder repartitionNodeBuilder = optimizableRepartitionNodeBuilder(); + final String sourceName = createRepartitionedSource( + builder, + keySerdeOverride, + valueSerdeOverride, + name, + repartitionNodeBuilder + ); + + tableParentNode = repartitionNodeBuilder.build(); + builder.addGraphNode(streamsGraphNode, tableParentNode); + subTopologySourceNodes = Collections.singleton(sourceName); + } else { + tableParentNode = streamsGraphNode; + subTopologySourceNodes = sourceNodes; + } + + final KTableSource tableSource = new KTableSource<>( + materializedInternal.storeName(), + materializedInternal.queryableStoreName() + ); + final ProcessorParameters processorParameters = new ProcessorParameters<>(tableSource, name); + final StreamsGraphNode tableNode = new StreamToTableNode<>( + name, + processorParameters, + materializedInternal + ); + + builder.addGraphNode(tableParentNode, tableNode); + + return new KTableImpl( + name, + keySerdeOverride, + valueSerdeOverride, + subTopologySourceNodes, + materializedInternal.queryableStoreName(), + tableSource, + tableNode, + builder + ); + } + @Override public KGroupedStream groupBy(final KeyValueMapper keySelector) { return groupBy(keySelector, Grouped.with(null, valSerde)); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java new file mode 100644 index 0000000000000..99eed1e3ee443 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamToTableNode.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals.graph; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.internals.KTableSource; +import org.apache.kafka.streams.kstream.internals.MaterializedInternal; +import org.apache.kafka.streams.kstream.internals.TimestampedKeyValueStoreMaterializer; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; + +/** + * Represents a KTable convert From KStream + */ +public class StreamToTableNode extends StreamsGraphNode { + + private final ProcessorParameters processorParameters; + private final MaterializedInternal materializedInternal; + + public StreamToTableNode(final String nodeName, + final ProcessorParameters processorParameters, + final MaterializedInternal materializedInternal) { + super(nodeName); + this.processorParameters = processorParameters; + this.materializedInternal = materializedInternal; + } + + @Override + public String toString() { + return "StreamToTableNode{" + + ", processorParameters=" + processorParameters + + ", materializedInternal=" + materializedInternal + + "} " + super.toString(); + } + + @SuppressWarnings("unchecked") + @Override + public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { + final StoreBuilder> storeBuilder = + new TimestampedKeyValueStoreMaterializer<>((MaterializedInternal>) materializedInternal).materialize(); + + final String processorName = processorParameters.processorName(); + final KTableSource ktableSource = (KTableSource) processorParameters.processorSupplier(); + topologyBuilder.addProcessor(processorName, processorParameters.processorSupplier(), parentNodeNames()); + + if (storeBuilder != null && ktableSource.queryableName() != null) { + topologyBuilder.addStateStore(storeBuilder, processorName); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 6361c7aa07868..12bd0eb8ac04a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -18,11 +18,16 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.Consumed; @@ -33,6 +38,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Produced; @@ -51,30 +57,45 @@ import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.SourceNode; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.core.IsNull.notNullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @@ -1529,6 +1550,8 @@ public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() { .groupByKey() .count(); + final String topologyDescription = builder.build().describe().toString(); + final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); final String topology = builder.build().describe().toString(); final Matcher matcher = repartitionTopicPattern.matcher(topology); @@ -1536,6 +1559,7 @@ public void shouldPropagateRepartitionFlagAfterGlobalKTableJoin() { final String match = matcher.group(); assertThat(match, notNullValue()); assertTrue(match.endsWith("repartition")); + } @Test @@ -2330,4 +2354,610 @@ public void shouldNotAllowNullNamedOnProcessWithStores() { assertThat(exception.getMessage(), equalTo("named can't be null")); } + + @Test + public void shouldNotMaterializedKTableFromKStream() { + final Consumed consumed = Consumed.with(Serdes.String(), Serdes.String()); + + final StreamsBuilder builder = new StreamsBuilder(); + + final String input = "input"; + final String output = "output"; + + builder.stream(input, consumed).toTable().toStream().to(output); + + final String topologyDescription = builder.build().describe().toString(); + + assertThat( + topologyDescription, + equalTo("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-TOTABLE-0000000002\n" + + " Processor: KSTREAM-TOTABLE-0000000002 (stores: [])\n" + + " --> KTABLE-TOSTREAM-0000000003\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KTABLE-TOSTREAM-0000000003 (stores: [])\n" + + " --> KSTREAM-SINK-0000000004\n" + + " <-- KSTREAM-TOTABLE-0000000002\n" + + " Sink: KSTREAM-SINK-0000000004 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000003\n\n") + ); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic inputTopic = + driver.createInputTopic(input, Serdes.String().serializer(), Serdes.String().serializer()); + final TestOutputTopic outputTopic = + driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.String().deserializer()); + + inputTopic.pipeInput("A", "01", 5L); + inputTopic.pipeInput("B", "02", 100L); + inputTopic.pipeInput("C", "03", 0L); + inputTopic.pipeInput("D", "04", 0L); + inputTopic.pipeInput("A", "05", 10L); + inputTopic.pipeInput("A", "06", 8L); + + final List> outputExpectRecords = new ArrayList<>(); + outputExpectRecords.add(new TestRecord<>("A", "01", Instant.ofEpochMilli(5L))); + outputExpectRecords.add(new TestRecord<>("B", "02", Instant.ofEpochMilli(100L))); + outputExpectRecords.add(new TestRecord<>("C", "03", Instant.ofEpochMilli(0L))); + outputExpectRecords.add(new TestRecord<>("D", "04", Instant.ofEpochMilli(0L))); + outputExpectRecords.add(new TestRecord<>("A", "05", Instant.ofEpochMilli(10L))); + outputExpectRecords.add(new TestRecord<>("A", "06", Instant.ofEpochMilli(8L))); + + assertEquals(outputTopic.readRecordsToList(), outputExpectRecords); + } + } + + @Test + public void shouldMaterializeKTableFromKStream() { + final Consumed consumed = Consumed.with(Serdes.String(), Serdes.String()); + + final StreamsBuilder builder = new StreamsBuilder(); + final String storeName = "store"; + + final String input = "input"; + builder.stream(input, consumed) + .toTable(Materialized.as(Stores.inMemoryKeyValueStore(storeName))); + + final String topologyDescription = builder.build().describe().toString(); + + assertThat( + topologyDescription, + equalTo("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-TOTABLE-0000000001\n" + + " Processor: KSTREAM-TOTABLE-0000000001 (stores: [store])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n") + ); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic inputTopic = + driver.createInputTopic(input, Serdes.String().serializer(), Serdes.String().serializer()); + final KeyValueStore store = driver.getKeyValueStore(storeName); + + inputTopic.pipeInput("A", "01"); + final Map expectedStore = mkMap(mkEntry("A", "01")); + + assertThat(asMap(store), is(expectedStore)); + } + } + + @Test + public void shouldSupportKeyChangeKTableFromKStream() { + final Consumed consumed = Consumed.with(Serdes.String(), Serdes.String()); + + final StreamsBuilder builder = new StreamsBuilder(); + + final String input = "input"; + final String output = "output"; + + builder.stream(input, consumed) + .map((key, value) -> new KeyValue<>(key + "-", value)) + .toTable() + .toStream().to(output); + + final Topology topology = builder.build(); + + final String topologyDescription = topology.describe().toString(); + + assertThat( + topologyDescription, + equalTo("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-MAP-0000000001\n" + + " Processor: KSTREAM-MAP-0000000001 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000005\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FILTER-0000000005 (stores: [])\n" + + " --> KSTREAM-SINK-0000000004\n" + + " <-- KSTREAM-MAP-0000000001\n" + + " Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-TOTABLE-0000000003-repartition)\n" + + " <-- KSTREAM-FILTER-0000000005\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-TOTABLE-0000000003-repartition])\n" + + " --> KSTREAM-TOTABLE-0000000003\n" + + " Processor: KSTREAM-TOTABLE-0000000003 (stores: [])\n" + + " --> KTABLE-TOSTREAM-0000000007\n" + + " <-- KSTREAM-SOURCE-0000000006\n" + + " Processor: KTABLE-TOSTREAM-0000000007 (stores: [])\n" + + " --> KSTREAM-SINK-0000000008\n" + + " <-- KSTREAM-TOTABLE-0000000003\n" + + " Sink: KSTREAM-SINK-0000000008 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000007\n\n") + ); + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { + final TestInputTopic inputTopic = + driver.createInputTopic(input, Serdes.String().serializer(), Serdes.String().serializer()); + final TestOutputTopic outputTopic = + driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.String().deserializer()); + + inputTopic.pipeInput("A", "01", 5L); + inputTopic.pipeInput("B", "02", 100L); + inputTopic.pipeInput("C", "03", 0L); + inputTopic.pipeInput("D", "04", 0L); + inputTopic.pipeInput("A", "05", 10L); + inputTopic.pipeInput("A", "06", 8L); + + final List> outputExpectRecords = new ArrayList<>(); + outputExpectRecords.add(new TestRecord<>("A-", "01", Instant.ofEpochMilli(5L))); + outputExpectRecords.add(new TestRecord<>("B-", "02", Instant.ofEpochMilli(100L))); + outputExpectRecords.add(new TestRecord<>("C-", "03", Instant.ofEpochMilli(0L))); + outputExpectRecords.add(new TestRecord<>("D-", "04", Instant.ofEpochMilli(0L))); + outputExpectRecords.add(new TestRecord<>("A-", "05", Instant.ofEpochMilli(10L))); + outputExpectRecords.add(new TestRecord<>("A-", "06", Instant.ofEpochMilli(8L))); + + assertEquals(outputTopic.readRecordsToList(), outputExpectRecords); + } + } + + + @Test + public void shouldSupportForeignKeyTableTableJoinWithKTableFromKStream() { + final Consumed consumed = Consumed.with(Serdes.String(), Serdes.String()); + final StreamsBuilder builder = new StreamsBuilder(); + + final String input1 = "input1"; + final String input2 = "input2"; + final String output = "output"; + + final KTable leftTable = builder.stream(input1, consumed).toTable(); + final KTable rightTable = builder.stream(input2, consumed).toTable(); + + final Function extractor = value -> value.split("\\|")[1]; + final ValueJoiner joiner = (value1, value2) -> "(" + value1 + "," + value2 + ")"; + + leftTable.join(rightTable, extractor, joiner).toStream().to(output); + + final Topology topology = builder.build(props); + + final String topologyDescription = topology.describe().toString(); + + assertThat( + topologyDescription, + equalTo("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KTABLE-SOURCE-0000000016 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic])\n" + + " --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input1])\n" + + " --> KSTREAM-TOTABLE-0000000002\n" + + " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000001])\n" + + " --> KTABLE-FK-JOIN-OUTPUT-0000000018\n" + + " <-- KTABLE-SOURCE-0000000016\n" + + " Processor: KSTREAM-TOTABLE-0000000002 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000001])\n" + + " --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KTABLE-FK-JOIN-OUTPUT-0000000018 (stores: [])\n" + + " --> KTABLE-TOSTREAM-0000000020\n" + + " <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000017\n" + + " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007 (stores: [])\n" + + " --> KTABLE-SINK-0000000008\n" + + " <-- KSTREAM-TOTABLE-0000000002\n" + + " Processor: KTABLE-TOSTREAM-0000000020 (stores: [])\n" + + " --> KSTREAM-SINK-0000000021\n" + + " <-- KTABLE-FK-JOIN-OUTPUT-0000000018\n" + + " Sink: KSTREAM-SINK-0000000021 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000020\n" + + " Sink: KTABLE-SINK-0000000008 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic)\n" + + " <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000007\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000003 (topics: [input2])\n" + + " --> KSTREAM-TOTABLE-0000000005\n" + + " Source: KTABLE-SOURCE-0000000009 (topics: [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic])\n" + + " --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011\n" + + " Processor: KSTREAM-TOTABLE-0000000005 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000004])\n" + + " --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013\n" + + " <-- KSTREAM-SOURCE-0000000003\n" + + " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n" + + " --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012\n" + + " <-- KTABLE-SOURCE-0000000009\n" + + " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000004])\n" + + " --> KTABLE-SINK-0000000015\n" + + " <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000011\n" + + " Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013 (stores: [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010])\n" + + " --> KTABLE-SINK-0000000015\n" + + " <-- KSTREAM-TOTABLE-0000000005\n" + + " Sink: KTABLE-SINK-0000000015 (topic: KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic)\n" + + " <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000012, KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000013\n\n") + ); + + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { + final TestInputTopic left = driver.createInputTopic(input1, new StringSerializer(), new StringSerializer()); + final TestInputTopic right = driver.createInputTopic(input2, new StringSerializer(), new StringSerializer()); + final TestOutputTopic outputTopic = driver.createOutputTopic(output, new StringDeserializer(), new StringDeserializer()); + + // Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records + right.pipeInput("rhs1", "rhsValue1"); + right.pipeInput("rhs2", "rhsValue2"); + right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results + + assertThat(outputTopic.readKeyValuesToMap(), is(emptyMap())); + + left.pipeInput("lhs1", "lhsValue1|rhs1"); + left.pipeInput("lhs2", "lhsValue2|rhs2"); + + final Map expected = mkMap( + mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), + mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)") + ); + assertThat(outputTopic.readKeyValuesToMap(), is(expected)); + + // Add another reference to an existing FK + left.pipeInput("lhs3", "lhsValue3|rhs1"); + + assertThat( + outputTopic.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)") + )) + ); + + left.pipeInput("lhs1", (String) null); + assertThat( + outputTopic.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs1", null) + )) + ); + } + } + + @Test + public void shouldSupportTableTableJoinWithKStreamToKTable() { + final Consumed consumed = Consumed.with(Serdes.String(), Serdes.String()); + final StreamsBuilder builder = new StreamsBuilder(); + + final String leftTopic = "left"; + final String rightTopic = "right"; + final String outputTopic = "output"; + + final KTable table1 = builder.stream(leftTopic, consumed).toTable(); + final KTable table2 = builder.stream(rightTopic, consumed).toTable(); + + table1.join(table2, MockValueJoiner.TOSTRING_JOINER).toStream().to(outputTopic); + + final Topology topology = builder.build(props); + final String topologyDescription = topology.describe().toString(); + + assertThat( + topologyDescription, + equalTo("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [left])\n" + + " --> KSTREAM-TOTABLE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000003 (topics: [right])\n" + + " --> KSTREAM-TOTABLE-0000000005\n" + + " Processor: KSTREAM-TOTABLE-0000000002 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000001])\n" + + " --> KTABLE-JOINTHIS-0000000007\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-TOTABLE-0000000005 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000004])\n" + + " --> KTABLE-JOINOTHER-0000000008\n" + + " <-- KSTREAM-SOURCE-0000000003\n" + + " Processor: KTABLE-JOINOTHER-0000000008 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000001])\n" + + " --> KTABLE-MERGE-0000000006\n" + + " <-- KSTREAM-TOTABLE-0000000005\n" + + " Processor: KTABLE-JOINTHIS-0000000007 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000004])\n" + + " --> KTABLE-MERGE-0000000006\n" + + " <-- KSTREAM-TOTABLE-0000000002\n" + + " Processor: KTABLE-MERGE-0000000006 (stores: [])\n" + + " --> KTABLE-TOSTREAM-0000000009\n" + + " <-- KTABLE-JOINTHIS-0000000007, KTABLE-JOINOTHER-0000000008\n" + + " Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n" + + " --> KSTREAM-SINK-0000000010\n" + + " <-- KTABLE-MERGE-0000000006\n" + + " Sink: KSTREAM-SINK-0000000010 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000009\n\n")); + + final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(topology).copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { + final TestInputTopic left = driver.createInputTopic(leftTopic, new StringSerializer(), new StringSerializer()); + final TestInputTopic right = driver.createInputTopic(rightTopic, new StringSerializer(), new StringSerializer()); + final TestOutputTopic output = driver.createOutputTopic(outputTopic, new StringDeserializer(), new StringDeserializer()); + + right.pipeInput("lhs1", "rhsValue1"); + right.pipeInput("rhs2", "rhsValue2"); + right.pipeInput("lhs3", "rhsValue3"); + + assertThat(output.readKeyValuesToMap(), is(emptyMap())); + + left.pipeInput("lhs1", "lhsValue1"); + left.pipeInput("lhs2", "lhsValue2"); + + final Map expected = mkMap( + mkEntry("lhs1", "lhsValue1+rhsValue1") + ); + + assertThat( + output.readKeyValuesToMap(), + is(expected) + ); + + left.pipeInput("lhs3", "lhsValue3"); + + assertThat( + output.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs3", "lhsValue3+rhsValue3") + )) + ); + + left.pipeInput("lhs1", "lhsValue4"); + assertThat( + output.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs1", "lhsValue4+rhsValue1") + )) + ); + + } + } + + @Test + public void shouldSupportStreamTableJoinWithKStreamToKTable() { + final String streamTopic = "streamTopic"; + final String tableTopic = "tableTopic"; + final String outputTopic = "output"; + final StreamsBuilder builder = new StreamsBuilder(); + final Consumed consumed = Consumed.with(Serdes.String(), Serdes.String()); + + final KStream stream = builder.stream(streamTopic, consumed); + final KTable table = builder.stream(tableTopic, consumed).toTable(); + + stream.join(table, MockValueJoiner.TOSTRING_JOINER).to(outputTopic); + + final Topology topology = builder.build(props); + final String topologyDescription = topology.describe().toString(); + + assertThat( + topologyDescription, + equalTo("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [streamTopic])\n" + + " --> KSTREAM-JOIN-0000000004\n" + + " Processor: KSTREAM-JOIN-0000000004 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [tableTopic])\n" + + " --> KSTREAM-TOTABLE-0000000003\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: output)\n" + + " <-- KSTREAM-JOIN-0000000004\n" + + " Processor: KSTREAM-TOTABLE-0000000003 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000002])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000001\n\n")); + + final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(topology).copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { + final TestInputTopic left = driver.createInputTopic(streamTopic, new StringSerializer(), new StringSerializer()); + final TestInputTopic right = driver.createInputTopic(tableTopic, new StringSerializer(), new StringSerializer()); + final TestOutputTopic output = driver.createOutputTopic(outputTopic, new StringDeserializer(), new StringDeserializer()); + + right.pipeInput("lhs1", "rhsValue1"); + right.pipeInput("rhs2", "rhsValue2"); + right.pipeInput("lhs3", "rhsValue3"); + + assertThat(output.readKeyValuesToMap(), is(emptyMap())); + + left.pipeInput("lhs1", "lhsValue1"); + left.pipeInput("lhs2", "lhsValue2"); + + final Map expected = mkMap( + mkEntry("lhs1", "lhsValue1+rhsValue1") + ); + + assertThat( + output.readKeyValuesToMap(), + is(expected) + ); + + left.pipeInput("lhs3", "lhsValue3"); + + assertThat( + output.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs3", "lhsValue3+rhsValue3") + )) + ); + + left.pipeInput("lhs1", "lhsValue4"); + assertThat( + output.readKeyValuesToMap(), + is(mkMap( + mkEntry("lhs1", "lhsValue4+rhsValue1") + )) + ); + + } + } + + @Test + public void shouldSupportGroupByCountWithKStreamToKTable() { + final Consumed consumed = Consumed.with(Serdes.String(), Serdes.String()); + final StreamsBuilder builder = new StreamsBuilder(); + final String input = "input"; + final String output = "output"; + + builder + .stream(input, consumed) + .toTable() + .groupBy(MockMapper.selectValueKeyValueMapper(), Grouped.with(Serdes.String(), Serdes.String())) + .count() + .toStream() + .to(output); + + final Topology topology = builder.build(props); + final String topologyDescription = topology.describe().toString(); + + assertThat( + topologyDescription, + equalTo("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-TOTABLE-0000000002\n" + + " Processor: KSTREAM-TOTABLE-0000000002 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000001])\n" + + " --> KTABLE-SELECT-0000000003\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KTABLE-SELECT-0000000003 (stores: [])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- KSTREAM-TOTABLE-0000000002\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n" + + " <-- KTABLE-SELECT-0000000003\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000006 (topics: [KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n" + + " --> KTABLE-AGGREGATE-0000000007\n" + + " Processor: KTABLE-AGGREGATE-0000000007 (stores: [KTABLE-AGGREGATE-STATE-STORE-0000000004])\n" + + " --> KTABLE-TOSTREAM-0000000008\n" + + " <-- KSTREAM-SOURCE-0000000006\n" + + " Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n" + + " --> KSTREAM-SINK-0000000009\n" + + " <-- KTABLE-AGGREGATE-0000000007\n" + + " Sink: KSTREAM-SINK-0000000009 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000008\n\n")); + + try ( + final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic inputTopic = + driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestOutputTopic outputTopic = + driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.Long().deserializer()); + + + inputTopic.pipeInput("A", "green", 10L); + inputTopic.pipeInput("B", "green", 9L); + inputTopic.pipeInput("A", "blue", 12L); + inputTopic.pipeInput("C", "yellow", 15L); + inputTopic.pipeInput("D", "green", 11L); + + assertEquals( + asList( + new TestRecord<>("green", 1L, Instant.ofEpochMilli(10)), + new TestRecord<>("green", 2L, Instant.ofEpochMilli(10)), + new TestRecord<>("green", 1L, Instant.ofEpochMilli(12)), + new TestRecord<>("blue", 1L, Instant.ofEpochMilli(12)), + new TestRecord<>("yellow", 1L, Instant.ofEpochMilli(15)), + new TestRecord<>("green", 2L, Instant.ofEpochMilli(12))), + outputTopic.readRecordsToList()); + } + } + + @Test + public void shouldSupportTriggerMaterializedWithKTableFromKStream() { + final Consumed consumed = Consumed.with(Serdes.String(), Serdes.String()); + final StreamsBuilder builder = new StreamsBuilder(); + final String input = "input"; + final String output = "output"; + final String storeName = "store"; + + builder.stream(input, consumed) + .toTable() + .mapValues( + value -> value.charAt(0) - (int) 'a', + Materialized.>as(storeName) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Integer())) + .toStream() + .to(output); + + final Topology topology = builder.build(props); + final String topologyDescription = topology.describe().toString(); + + assertThat( + topologyDescription, + equalTo("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-TOTABLE-0000000002\n" + + " Processor: KSTREAM-TOTABLE-0000000002 (stores: [])\n" + + " --> KTABLE-MAPVALUES-0000000003\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KTABLE-MAPVALUES-0000000003 (stores: [store])\n" + + " --> KTABLE-TOSTREAM-0000000004\n" + + " <-- KSTREAM-TOTABLE-0000000002\n" + + " Processor: KTABLE-TOSTREAM-0000000004 (stores: [])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- KTABLE-MAPVALUES-0000000003\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000004\n\n")); + + try ( + final TopologyTestDriver driver = new TopologyTestDriver( + topology, + mkProperties(mkMap( + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test"), + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("kafka-test").getAbsolutePath()) + )), + Instant.ofEpochMilli(0L))) { + final TestInputTopic inputTopic = + driver.createInputTopic(input, new StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestOutputTopic outputTopic = + driver.createOutputTopic(output, Serdes.String().deserializer(), Serdes.Integer().deserializer()); + final KeyValueStore store = driver.getKeyValueStore(storeName); + + inputTopic.pipeInput("A", "green", 10L); + inputTopic.pipeInput("B", "green", 9L); + inputTopic.pipeInput("A", "blue", 12L); + inputTopic.pipeInput("C", "yellow", 15L); + inputTopic.pipeInput("D", "green", 11L); + + final Map expectedStore = new HashMap<>(); + expectedStore.putIfAbsent("A", 1); + expectedStore.putIfAbsent("B", 6); + expectedStore.putIfAbsent("C", 24); + expectedStore.putIfAbsent("D", 6); + + assertEquals(expectedStore, asMap(store)); + + assertEquals( + asList( + new TestRecord<>("A", 6, Instant.ofEpochMilli(10)), + new TestRecord<>("B", 6, Instant.ofEpochMilli(9)), + new TestRecord<>("A", 1, Instant.ofEpochMilli(12)), + new TestRecord<>("C", 24, Instant.ofEpochMilli(15)), + new TestRecord<>("D", 6, Instant.ofEpochMilli(11))), + outputTopic.readRecordsToList()); + + } + } + + private static Map asMap(final KeyValueStore store) { + final HashMap result = new HashMap<>(); + store.all().forEachRemaining(kv -> result.put(kv.key, kv.value)); + return result; + } }