From 1d7065efec7dcd538e69b558187029ee6f07024b Mon Sep 17 00:00:00 2001 From: Daniele Bonelli Date: Thu, 6 Dec 2018 10:55:04 +0100 Subject: [PATCH] fixed ktable tests using kstream api --- .../streams/scala/kstream/KTableTest.scala | 84 +++++++++++-------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala index dc080f133107e..0ef50e383c9ac 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala @@ -34,26 +34,32 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val sourceTopic = "source" val sinkTopic = "sink" - val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count() - table.filter((_, value) => value > 1).toStream.to(sinkTopic) + val table = builder.table[String, String](sourceTopic) + table.mapValues(_.length).filter((_, value) => value > 5).toStream.to(sinkTopic) val testDriver = createTestDriver(builder) { - testDriver.pipeRecord(sourceTopic, ("1", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "firstvalue")) + val record = testDriver.readRecord[String, Int](sinkTopic) record.key shouldBe "1" - record.value shouldBe (null: java.lang.Long) + record.value shouldBe 10 + } + { + testDriver.pipeRecord(sourceTopic, ("1", "secondvalue")) + val record = testDriver.readRecord[String, Int](sinkTopic) + record.key shouldBe "1" + record.value shouldBe 11 } { - testDriver.pipeRecord(sourceTopic, ("1", "value2")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "short")) + val record = testDriver.readRecord[String, Int](sinkTopic) record.key shouldBe "1" - record.value shouldBe 2 + record.value shouldBe (null: java.lang.Long) } { - testDriver.pipeRecord(sourceTopic, ("2", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("2", "val3")) + val record = testDriver.readRecord[String, Int](sinkTopic) record.key shouldBe "2" record.value shouldBe (null: java.lang.Long) } @@ -67,30 +73,36 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val sourceTopic = "source" val sinkTopic = "sink" - val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count() - table.filterNot((_, value) => value > 1).toStream.to(sinkTopic) + val table = builder.table[String, String](sourceTopic) + table.filterNot((_, value) => value.exists(_.isUpper)).toStream.to(sinkTopic) val testDriver = createTestDriver(builder) { - testDriver.pipeRecord(sourceTopic, ("1", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "FirstValue")) + val record = testDriver.readRecord[String, String](sinkTopic) record.key shouldBe "1" - record.value shouldBe 1 + record.value shouldBe (null: java.lang.String) } { - testDriver.pipeRecord(sourceTopic, ("1", "value2")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "secondvalue")) + val record = testDriver.readRecord[String, String](sinkTopic) record.key shouldBe "1" - record.value shouldBe (null: java.lang.Long) + record.value shouldBe "secondvalue" } { - testDriver.pipeRecord(sourceTopic, ("2", "value1")) - val record = testDriver.readRecord[String, Long](sinkTopic) + testDriver.pipeRecord(sourceTopic, ("1", "Short")) + val record = testDriver.readRecord[String, String](sinkTopic) + record.key shouldBe "1" + record.value shouldBe (null: java.lang.String) + } + { + testDriver.pipeRecord(sourceTopic, ("2", "val")) + val record = testDriver.readRecord[String, String](sinkTopic) record.key shouldBe "2" - record.value shouldBe 1 + record.value shouldBe "val" } - testDriver.readRecord[String, Long](sinkTopic) shouldBe null + testDriver.readRecord[String, String](sinkTopic) shouldBe null testDriver.close() } @@ -101,17 +113,17 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val sourceTopic2 = "source2" val sinkTopic = "sink" - val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count() - val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count() + val table1 = builder.table[String, Int](sourceTopic1) + val table2 = builder.table[String, Int](sourceTopic2) table1.join(table2)((a, b) => a + b).toStream.to(sinkTopic) val testDriver = createTestDriver(builder) - testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1")) - testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1")) - testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2 + testDriver.pipeRecord(sourceTopic1, ("1", 3)) + testDriver.pipeRecord(sourceTopic2, ("1", 2)) + testDriver.readRecord[String, Int](sinkTopic).value shouldBe 5 - testDriver.readRecord[String, Long](sinkTopic) shouldBe null + testDriver.readRecord[String, Int](sinkTopic) shouldBe null testDriver.close() } @@ -122,20 +134,20 @@ class KTableTest extends FlatSpec with Matchers with TestDriver { val sourceTopic2 = "source2" val sinkTopic = "sink" val stateStore = "store" - val materialized = Materialized.as[String, Long, ByteArrayKeyValueStore](stateStore) + val materialized = Materialized.as[String, Int, ByteArrayKeyValueStore](stateStore) - val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count() - val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count() + val table1 = builder.table[String, Int](sourceTopic1) + val table2 = builder.table[String, Int](sourceTopic2) table1.join(table2, materialized)((a, b) => a + b).toStream.to(sinkTopic) val testDriver = createTestDriver(builder) - testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1")) - testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1")) - testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2 - testDriver.getKeyValueStore[String, Long](stateStore).get("1") shouldBe 2 + testDriver.pipeRecord(sourceTopic1, ("1", 1)) + testDriver.pipeRecord(sourceTopic2, ("1", 3)) + testDriver.readRecord[String, Int](sinkTopic).value shouldBe 4 + testDriver.getKeyValueStore[String, Int](stateStore).get("1") shouldBe 4 - testDriver.readRecord[String, Long](sinkTopic) shouldBe null + testDriver.readRecord[String, Int](sinkTopic) shouldBe null testDriver.close() }