From bf132d145840933450820e94a8cd7595eff6b60d Mon Sep 17 00:00:00 2001 From: "high.lee" Date: Fri, 31 Jan 2020 13:38:38 +0900 Subject: [PATCH] KAFKA-9483; Add Scala KStream#toTable to the Streams DSL --- .../kafka/streams/scala/kstream/KStream.scala | 20 +++++++++++++++ .../streams/scala/kstream/KStreamTest.scala | 25 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 5f3c6bed6e065..e8e63e84331eb 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -299,6 +299,26 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit = inner.to(extractor, produced) + /** + * Convert this stream to a [[KTable]]. + * + * @return a [[KTable]] that contains the same records as this [[KStream]] + * @see `org.apache.kafka.streams.kstream.KStream#toTable` + */ + def toTable: KTable[K, V] = + new KTable(inner.toTable) + + /** + * Convert this stream to a [[KTable]]. + * + * @param materialized a `Materialized` that describes how the `StateStore` for the resulting [[KTable]] + * should be materialized. + * @return a [[KTable]] that contains the same records as this [[KStream]] + * @see `org.apache.kafka.streams.kstream.KStream#toTable` + */ + def toTable(materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] = + new KTable(inner.toTable(materialized)) + /** * Transform each record of the input stream into zero or more records in the output stream (both key and value type * can be altered arbitrarily). diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala index 14de557b9e6d8..e5a0aad21483d 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala @@ -312,4 +312,29 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver { testDriver.close() } + + "join 2 KStreamToTables" should "join correctly records" in { + val builder = new StreamsBuilder() + val sourceTopic1 = "source1" + val sourceTopic2 = "source2" + val sinkTopic = "sink" + + val table1 = builder.stream[String, String](sourceTopic1).toTable + val table2 = builder.stream[String, String](sourceTopic2).toTable + table1.join(table2)((a, b) => a + b).toStream.to(sinkTopic) + + val testDriver = createTestDriver(builder) + val testInput1 = testDriver.createInput[String, String](sourceTopic1) + val testInput2 = testDriver.createInput[String, String](sourceTopic2) + val testOutput = testDriver.createOutput[String, String](sinkTopic) + + testInput1.pipeInput("1", "topic1value1") + testInput2.pipeInput("1", "topic2value1") + + testOutput.readValue shouldBe "topic1value1topic2value1" + + testOutput.isEmpty shouldBe true + + testDriver.close() + } }