Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,32 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
val sourceTopic = "source"
val sinkTopic = "sink"

val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count()
table.filter((_, value) => value > 1).toStream.to(sinkTopic)
val table = builder.table[String, String](sourceTopic)
table.mapValues(_.length).filter((_, value) => value > 5).toStream.to(sinkTopic)

val testDriver = createTestDriver(builder)

{
testDriver.pipeRecord(sourceTopic, ("1", "value1"))
val record = testDriver.readRecord[String, Long](sinkTopic)
testDriver.pipeRecord(sourceTopic, ("1", "firstvalue"))
val record = testDriver.readRecord[String, Int](sinkTopic)
record.key shouldBe "1"
record.value shouldBe (null: java.lang.Long)
record.value shouldBe 10
}
{
testDriver.pipeRecord(sourceTopic, ("1", "secondvalue"))
val record = testDriver.readRecord[String, Int](sinkTopic)
record.key shouldBe "1"
record.value shouldBe 11
}
{
testDriver.pipeRecord(sourceTopic, ("1", "value2"))
val record = testDriver.readRecord[String, Long](sinkTopic)
testDriver.pipeRecord(sourceTopic, ("1", "short"))
val record = testDriver.readRecord[String, Int](sinkTopic)
record.key shouldBe "1"
record.value shouldBe 2
record.value shouldBe (null: java.lang.Long)
}
{
testDriver.pipeRecord(sourceTopic, ("2", "value1"))
val record = testDriver.readRecord[String, Long](sinkTopic)
testDriver.pipeRecord(sourceTopic, ("2", "val3"))
val record = testDriver.readRecord[String, Int](sinkTopic)
record.key shouldBe "2"
record.value shouldBe (null: java.lang.Long)
}
Expand All @@ -67,30 +73,36 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
val sourceTopic = "source"
val sinkTopic = "sink"

val table = builder.stream[String, String](sourceTopic).groupBy((key, _) => key).count()
table.filterNot((_, value) => value > 1).toStream.to(sinkTopic)
val table = builder.table[String, String](sourceTopic)
table.filterNot((_, value) => value.exists(_.isUpper)).toStream.to(sinkTopic)

val testDriver = createTestDriver(builder)

{
testDriver.pipeRecord(sourceTopic, ("1", "value1"))
val record = testDriver.readRecord[String, Long](sinkTopic)
testDriver.pipeRecord(sourceTopic, ("1", "FirstValue"))
val record = testDriver.readRecord[String, String](sinkTopic)
record.key shouldBe "1"
record.value shouldBe 1
record.value shouldBe (null: java.lang.String)
}
{
testDriver.pipeRecord(sourceTopic, ("1", "value2"))
val record = testDriver.readRecord[String, Long](sinkTopic)
testDriver.pipeRecord(sourceTopic, ("1", "secondvalue"))
val record = testDriver.readRecord[String, String](sinkTopic)
record.key shouldBe "1"
record.value shouldBe (null: java.lang.Long)
record.value shouldBe "secondvalue"
}
{
testDriver.pipeRecord(sourceTopic, ("2", "value1"))
val record = testDriver.readRecord[String, Long](sinkTopic)
testDriver.pipeRecord(sourceTopic, ("1", "Short"))
val record = testDriver.readRecord[String, String](sinkTopic)
record.key shouldBe "1"
record.value shouldBe (null: java.lang.String)
}
{
testDriver.pipeRecord(sourceTopic, ("2", "val"))
val record = testDriver.readRecord[String, String](sinkTopic)
record.key shouldBe "2"
record.value shouldBe 1
record.value shouldBe "val"
}
testDriver.readRecord[String, Long](sinkTopic) shouldBe null
testDriver.readRecord[String, String](sinkTopic) shouldBe null

testDriver.close()
}
Expand All @@ -101,17 +113,17 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
val sourceTopic2 = "source2"
val sinkTopic = "sink"

val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count()
val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count()
val table1 = builder.table[String, Int](sourceTopic1)
val table2 = builder.table[String, Int](sourceTopic2)
table1.join(table2)((a, b) => a + b).toStream.to(sinkTopic)

val testDriver = createTestDriver(builder)

testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2
testDriver.pipeRecord(sourceTopic1, ("1", 3))
testDriver.pipeRecord(sourceTopic2, ("1", 2))
testDriver.readRecord[String, Int](sinkTopic).value shouldBe 5

testDriver.readRecord[String, Long](sinkTopic) shouldBe null
testDriver.readRecord[String, Int](sinkTopic) shouldBe null

testDriver.close()
}
Expand All @@ -122,20 +134,20 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
val sourceTopic2 = "source2"
val sinkTopic = "sink"
val stateStore = "store"
val materialized = Materialized.as[String, Long, ByteArrayKeyValueStore](stateStore)
val materialized = Materialized.as[String, Int, ByteArrayKeyValueStore](stateStore)

val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) => key).count()
val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) => key).count()
val table1 = builder.table[String, Int](sourceTopic1)
val table2 = builder.table[String, Int](sourceTopic2)
table1.join(table2, materialized)((a, b) => a + b).toStream.to(sinkTopic)

val testDriver = createTestDriver(builder)

testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2
testDriver.getKeyValueStore[String, Long](stateStore).get("1") shouldBe 2
testDriver.pipeRecord(sourceTopic1, ("1", 1))
testDriver.pipeRecord(sourceTopic2, ("1", 3))
testDriver.readRecord[String, Int](sinkTopic).value shouldBe 4
testDriver.getKeyValueStore[String, Int](stateStore).get("1") shouldBe 4

testDriver.readRecord[String, Long](sinkTopic) shouldBe null
testDriver.readRecord[String, Int](sinkTopic) shouldBe null

testDriver.close()
}
Expand Down