KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not initialized correctly#18490
KAFKA-18479: RocksDBTimeOrderedKeyValueBuffer not initialized correctly#18490mjsax merged 2 commits intoapache:trunkfrom
Conversation
RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides via Joined, but always uses serdes from StreamsConfig.
|
|
||
| final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all(); | ||
| final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next(); | ||
| try (final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all()) { |
| final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.as( | ||
| Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5)))); | ||
| final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled); | ||
| final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, false); |
There was a problem hiding this comment.
To test Joined we use false here
| final KTable<Long, String> rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as( | ||
| Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5)))); | ||
| final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled); | ||
| final Properties streamsConfig = setupConfigsAndUtils(cacheEnabled, true); |
There was a problem hiding this comment.
For leftJoin we stick with using StreamsConfig serde to test both setups.
| } | ||
|
|
||
| private void createBuffer(final Duration grace) { | ||
| private void createBuffer(final Duration grace, final Serde<String> serde) { |
There was a problem hiding this comment.
"randomizing" this test, to cover both code path.
…ly (#18490) RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides via Joined, but always uses serdes from StreamsConfig. Reviewers: Bill Bejeck <bill@confluent.io>
…ly (#18490) RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides via Joined, but always uses serdes from StreamsConfig. Reviewers: Bill Bejeck <bill@confluent.io>
…ly (apache#18490) RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides via Joined, but always uses serdes from StreamsConfig. Reviewers: Bill Bejeck <bill@confluent.io>
|
Merged to |
| @@ -62,18 +62,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest { | |||
| @SuppressWarnings({"rawtypes", "unchecked"}) | |||
There was a problem hiding this comment.
Sorry for the delayed feedback. To address the following warnings, could you please move these suppression annotations to the class-level?
> Task :streams:compileTestJava
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: /home/chia7712/project/kafka/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
There was a problem hiding this comment.
Nice catch! -- I think we should never use class level suppression though. Opened a PR to suppress for the individual methods: #18580
…ly (apache#18490) RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides via Joined, but always uses serdes from StreamsConfig. Reviewers: Bill Bejeck <bill@confluent.io>
…ly (apache#18490) RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides via Joined, but always uses serdes from StreamsConfig. Reviewers: Bill Bejeck <bill@confluent.io>
…ly (apache#18490) RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides via Joined, but always uses serdes from StreamsConfig. Reviewers: Bill Bejeck <bill@confluent.io>
RocksDBTimeOrderedKeyValueBuffer is not initialize with serdes provides via Joined, but always uses serdes from StreamsConfig.