diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 5f3c6bed6e065..e8e63e84331eb 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -299,6 +299,26 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit = inner.to(extractor, produced) + /** + * Convert this stream to a [[KTable]]. + * + * @return a [[KTable]] that contains the same records as this [[KStream]] + * @see `org.apache.kafka.streams.kstream.KStream#toTable` + */ + def toTable: KTable[K, V] = + new KTable(inner.toTable) + + /** + * Convert this stream to a [[KTable]]. + * + * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] + * should be materialized. + * @return a [[KTable]] that contains the same records as this [[KStream]] + * @see `org.apache.kafka.streams.kstream.KStream#toTable` + */ + def toTable(materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = + new KTable(inner.toTable(materialized)) + /** * Transform each record of the input stream into zero or more records in the output stream (both key and value type * can be altered arbitrarily). diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala index 14de557b9e6d8..e5a0aad21483d 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala @@ -312,4 +312,29 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver { testDriver.close() } + + "join 2 KStreamToTables" should "join correctly records" in { + val builder = new StreamsBuilder() + val sourceTopic1 = "source1" + val sourceTopic2 = "source2" + val sinkTopic = "sink" + + val table1 = builder.stream[String, String](sourceTopic1).toTable + val table2 = builder.stream[String, String](sourceTopic2).toTable + table1.join(table2)((a, b) => a + b).toStream.to(sinkTopic) + + val testDriver = createTestDriver(builder) + val testInput1 = testDriver.createInput[String, String](sourceTopic1) + val testInput2 = testDriver.createInput[String, String](sourceTopic2) + val testOutput = testDriver.createOutput[String, String](sinkTopic) + + testInput1.pipeInput("1", "topic1value1") + testInput2.pipeInput("1", "topic2value1") + + testOutput.readValue shouldBe "topic1value1topic2value1" + + testOutput.isEmpty shouldBe true + + testDriver.close() + } }