diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index a7687a9556449..8e9688494eeab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1269,8 +1269,14 @@ private KStream doStreamTableJoin(final KTable table, throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); } bufferStoreName = Optional.of(name + "-Buffer"); - final RocksDBTimeOrderedKeyValueBuffer.Builder storeBuilder = - new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joined.gracePeriod(), name); + final RocksDBTimeOrderedKeyValueBuffer.Builder storeBuilder = + new RocksDBTimeOrderedKeyValueBuffer.Builder<>( + bufferStoreName.get(), + joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde, + joinedInternal.valueSerde() != null ? joinedInternal.valueSerde() : valueSerde, + joined.gracePeriod(), + name + ); builder.addStateStore(new StoreBuilderWrapper(storeBuilder)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java index 2a8b3393aaa7f..fcad03d580d6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java @@ -65,13 +65,23 @@ public class RocksDBTimeOrderedKeyValueBuffer implements TimeOrderedKeyVal public static class Builder implements StoreBuilder> { private final String storeName; + private final Serde keySerde; + private final Serde valueSerde; private boolean loggingEnabled = true; private Map logConfig = new HashMap<>(); private final Duration grace; private final String topic; - public Builder(final String storeName, final Duration grace, final String topic) { + public Builder( + final String storeName, + final Serde keySerde, + final Serde valueSerde, + final Duration grace, + final String topic + ) { this.storeName = storeName; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.grace = grace; this.topic = topic; } @@ -116,6 +126,8 @@ public StoreBuilder> withLoggingDisabled() { public TimeOrderedKeyValueBuffer build() { return new RocksDBTimeOrderedKeyValueBuffer<>( new RocksDBTimeOrderedKeyValueBytesStoreSupplier(storeName).get(), + keySerde, + valueSerde, grace, topic, loggingEnabled); @@ -139,10 +151,14 @@ public String name() { public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store, + final Serde keySerde, + final Serde valueSerde, final Duration gracePeriod, final String topic, final boolean loggingEnabled) { this.store = store; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.gracePeriod = gracePeriod.toMillis(); minTimestamp = store.minTimestamp(); minValid = false; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java index 8d6e2d61c5680..6d33d63724373 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java @@ -161,12 +161,17 @@ public static Collection data() { @BeforeClass public static void setupConfigsAndUtils() { STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); } - void prepareEnvironment() throws InterruptedException { + void prepareEnvironment(final boolean setSerdes) throws InterruptedException { + if (setSerdes) { + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + } else { + STREAMS_CONFIG.remove(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG); + STREAMS_CONFIG.remove(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG); + } if (!cacheEnabled) { STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); } @@ -278,16 +283,13 @@ void runSelfJoinTestWithDriver(final List>> expect private void checkQueryableStore(final String queryableName, final TestRecord expectedFinalResult, final TopologyTestDriver driver) { final ReadOnlyKeyValueStore> store = driver.getTimestampedKeyValueStore(queryableName); - final KeyValueIterator> all = store.all(); - final KeyValue> onlyEntry = all.next(); + try (final KeyValueIterator> all = store.all()) { + final KeyValue> onlyEntry = all.next(); - try { assertThat(onlyEntry.key, is(expectedFinalResult.key())); assertThat(onlyEntry.value.value(), is(expectedFinalResult.value())); assertThat(onlyEntry.value.timestamp(), is(expectedFinalResult.timestamp())); assertThat(all.hasNext(), is(false)); - } finally { - all.close(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index b1d2390dc8354..ab7d1ef743e5c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -56,7 +56,7 @@ public StreamStreamJoinIntegrationTest(final boolean cacheEnabled) { @Before public void prepareTopology() throws InterruptedException { - super.prepareEnvironment(); + super.prepareEnvironment(true); appID = "stream-stream-join-integration-test"; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java index e4e46ec69a2ea..abaea22cf8a39 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java @@ -57,7 +57,7 @@ public StreamTableJoinIntegrationTest(final boolean cacheEnabled) { @Before public void prepareTopology() throws InterruptedException { - super.prepareEnvironment(); + super.prepareEnvironment(true); appID = "stream-table-join-integration-test"; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java index cf859e46ba4d1..02d7e7e775443 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java @@ -19,10 +19,12 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.IntegrationTest; @@ -60,20 +62,20 @@ public StreamTableJoinWithGraceIntegrationTest(final boolean cacheEnabled) { @Before public void prepareTopology() throws InterruptedException { - super.prepareEnvironment(); appID = "stream-table-join-integration-test"; builder = new StreamsBuilder(); joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(), "Grace", Duration.ofMillis(2)); } @Test - public void testInnerWithVersionedStore() { + public void testInnerWithVersionedStore() throws Exception { + super.prepareEnvironment(false); STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); - leftStream = builder.stream(INPUT_TOPIC_LEFT); - rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as( + leftStream = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.Long(), Serdes.String())); + rightTable = builder.table(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.as( Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5)))); - leftStream.join(rightTable, valueJoiner, joined).to(OUTPUT_TOPIC); + leftStream.join(rightTable, valueJoiner, joined).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.String())); final List>> expectedResult = Arrays.asList( null, @@ -105,7 +107,8 @@ public void testInnerWithVersionedStore() { } @Test - public void testLeftWithVersionedStore() { + public void testLeftWithVersionedStore() throws Exception { + super.prepareEnvironment(true); STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left"); leftStream = builder.stream(INPUT_TOPIC_LEFT); @@ -141,4 +144,4 @@ public void testLeftWithVersionedStore() { runTestWithDriver(input, expectedResult); } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java index 24f4dc1face81..871ce50aaf91c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java @@ -56,7 +56,7 @@ public TableTableJoinIntegrationTest(final boolean cacheEnabled) { @Before public void prepareTopology() throws InterruptedException { - super.prepareEnvironment(); + super.prepareEnvironment(true); appID = "table-table-join-integration-test"; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java index 99bef27bf79c6..9c13883d7f580 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; @@ -57,18 +58,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest { @Before public void setUp() { - when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde()); - when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde()); final Metrics metrics = new Metrics(); offset = 0; streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime()); context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory()); } - private void createBuffer(final Duration grace) { + private void createBuffer(final Duration grace, final Serde serde) { final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing").get(); - buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, "testing", false); + buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, serde, serde, grace, "testing", false); buffer.setSerdesIfNull(serdeGetter); buffer.init((StateStoreContext) context, store); } @@ -81,14 +80,16 @@ private boolean pipeRecord(final String key, final String value, final long time @Test public void shouldReturnIfRecordWasAdded() { - createBuffer(Duration.ofMillis(1)); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ofMillis(1), null); assertThat(pipeRecord("K", "V", 2L), equalTo(true)); assertThat(pipeRecord("K", "V", 0L), equalTo(false)); } @Test public void shouldPutInBufferAndUpdateFields() { - createBuffer(Duration.ofMinutes(1)); + createBuffer(Duration.ofMinutes(1), Serdes.String()); assertNumSizeAndTimestamp(buffer, 0, Long.MAX_VALUE, 0); pipeRecord("1", "0", 0L); assertNumSizeAndTimestamp(buffer, 1, 0, 42); @@ -98,7 +99,7 @@ public void shouldPutInBufferAndUpdateFields() { @Test public void shouldAddAndEvictRecord() { - createBuffer(Duration.ZERO); + createBuffer(Duration.ZERO, Serdes.String()); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); assertNumSizeAndTimestamp(buffer, 1, 0, 42); @@ -109,7 +110,9 @@ public void shouldAddAndEvictRecord() { @Test public void shouldAddAndEvictRecordTwice() { - createBuffer(Duration.ZERO); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ZERO, null); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); assertNumSizeAndTimestamp(buffer, 1, 0, 42); @@ -125,7 +128,7 @@ public void shouldAddAndEvictRecordTwice() { @Test public void shouldAddAndEvictRecordTwiceWithNonZeroGrace() { - createBuffer(Duration.ofMillis(1)); + createBuffer(Duration.ofMillis(1), Serdes.String()); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement()); @@ -139,7 +142,9 @@ public void shouldAddAndEvictRecordTwiceWithNonZeroGrace() { @Test public void shouldAddRecordsTwiceAndEvictRecordsOnce() { - createBuffer(Duration.ZERO); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ZERO, null); final AtomicInteger count = new AtomicInteger(0); pipeRecord("1", "0", 0L); buffer.evictWhile(() -> buffer.numRecords() > 1, r -> count.getAndIncrement()); @@ -151,7 +156,9 @@ public void shouldAddRecordsTwiceAndEvictRecordsOnce() { @Test public void shouldDropLateRecords() { - createBuffer(Duration.ZERO); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ZERO, null); pipeRecord("1", "0", 1L); assertNumSizeAndTimestamp(buffer, 1, 1, 42); pipeRecord("2", "0", 0L); @@ -160,7 +167,7 @@ public void shouldDropLateRecords() { @Test public void shouldDropLateRecordsWithNonZeroGrace() { - createBuffer(Duration.ofMillis(1)); + createBuffer(Duration.ofMillis(1), Serdes.String()); pipeRecord("1", "0", 2L); assertNumSizeAndTimestamp(buffer, 1, 2, 42); pipeRecord("2", "0", 1L); @@ -171,7 +178,9 @@ public void shouldDropLateRecordsWithNonZeroGrace() { @Test public void shouldHandleCollidingKeys() { - createBuffer(Duration.ofMillis(1)); + when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde()); + when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde()); + createBuffer(Duration.ofMillis(1), null); final AtomicInteger count = new AtomicInteger(0); pipeRecord("2", "0", 0L); buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement()); @@ -196,4 +205,4 @@ private void assertNumSizeAndTimestamp(final TimeOrderedKeyValueBuffer