KAFKA-3522: Add RocksDBTimestampedSegmentedBytesStore#6186
KAFKA-3522: Add RocksDBTimestampedSegmentedBytesStore#6186mjsax merged 4 commits intoapache:trunkfrom
Conversation
e15d7e1 to
47e3489
Compare
47e3489 to
dec3072
Compare
|
Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman |
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; | ||
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; | ||
|
|
||
| public class AbstractSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore { |
There was a problem hiding this comment.
This replaces/generalizes RocksDBSegmentedBytesStore to work with both segment types
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; | ||
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; | ||
|
|
||
| public class RocksDBSegmentedBytesStore implements SegmentedBytesStore { |
There was a problem hiding this comment.
All logic is moved into AbstractSegmentedBytesStore.
bbejeck
left a comment
There was a problem hiding this comment.
I've made a pass and overall this looks good to me. I only have some comments regarding testing.
Looks like TimestampedSegments and TimestampedSegment need unit tests.
Also with the StoresTest do we want to add a test ensuring that the underlying SegmentedBytesStore is of the correct type (not sure how hard this will be)? I add this only because it will help now and in the near future when we switch to have KIP-258 enabled.
| import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; | ||
| import org.apache.kafka.streams.state.KeyValueIterator; | ||
| import org.rocksdb.RocksDBException; | ||
| import org.rocksdb.WriteBatch; |
There was a problem hiding this comment.
The name of this class implies that it's abstracting over all segmented bytes stores, but it depends on RocksDB... Should we rename the class to AbstractRocksDBSegmentedBytesStore? (Or abstract away the concept of a batch?)
| import org.rocksdb.RocksDBException; | ||
| import org.rocksdb.WriteBatch; | ||
|
|
||
| public interface BulkLoadingKeyValueStore extends KeyValueStore<Bytes, byte[]> { |
There was a problem hiding this comment.
It seems unnecessary for this interface to subtype KeyValueStore<Bytes, byte[]>. The methods it defines are unrelated to the key/value store.
Should this just be an independent interface to be mixed in to classes that support bulk loading? It could be named BulkLoadingStore.
| final long retention, | ||
| final long segmentInterval, | ||
| final KeySchema keySchema) { | ||
| super(name, metricScope, keySchema, new TimestampedSegments(name, retention, segmentInterval)); |
There was a problem hiding this comment.
Instead of declaring two subclasses that only function to pass different arguments to the constructor of their superclass, can we just inline these constructors and pass the desired different arguments at the call site?
There was a problem hiding this comment.
We could. I prefer the current way because it seems easier to use.
There was a problem hiding this comment.
I'm fine with either approach, but testing coverage wise:
-
If we want to keep a separate class, should we also add a
RocksDBTimestampedSegmentedBytesStoreTest? -
If we just want to maintain a single class, say under
RocksDBSegmentedBytesStoreas the currentAbstractRocksDBSegmentedBytesStore, then should we augment the existing test file with timestamps enabled as well?
There was a problem hiding this comment.
Will add RocksDBTimestampedSegmentedBytesStoreTest.
|
Java8 passed. Java11 timed out. Retest this please. |
|
Java8 failed with https://issues.apache.org/jira/browse/KAFKA-7965 Java 11 passed. Retest this please. |
guozhangwang
left a comment
There was a problem hiding this comment.
I've got one meta question about the additional TimestampedSegments and TimestampedSegment classes, others are all nit.
| final long retention, | ||
| final long segmentInterval, | ||
| final KeySchema keySchema) { | ||
| super(name, metricScope, keySchema, new TimestampedSegments(name, retention, segmentInterval)); |
There was a problem hiding this comment.
I'm fine with either approach, but testing coverage wise:
-
If we want to keep a separate class, should we also add a
RocksDBTimestampedSegmentedBytesStoreTest? -
If we just want to maintain a single class, say under
RocksDBSegmentedBytesStoreas the currentAbstractRocksDBSegmentedBytesStore, then should we augment the existing test file with timestamps enabled as well?
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP; | ||
| import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; | ||
|
|
||
| public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore { |
There was a problem hiding this comment.
Just to clarify there's no logic changes from the existing RocksDBSegmentedBytesStore, but just extract it out right (if yes I'll skip reviewing this file)?
There was a problem hiding this comment.
Yes. It's the same code.
| import java.io.IOException; | ||
|
|
||
| public interface Segment extends StateStore { | ||
| public interface Segment extends KeyValueStore<Bytes, byte[]>, BulkLoadingStore { |
There was a problem hiding this comment.
We keep the extended super class intentionally so that in the future we can implement the segment differently (e.g. not based on rocksDB at all), but since this is an internal class anyways we can always change it back when needed to, so I guess this is fine --- just to leave the context on this.
| import java.io.IOException; | ||
| import java.util.Objects; | ||
|
|
||
| class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<TimestampedSegment>, Segment { |
There was a problem hiding this comment.
What's the benefits of keeping this and the following TimestampedSegments along from KeyValueSegment? By looking at the source code now they seem exactly the same. Is it going to be changed in follow-up PRs?
There was a problem hiding this comment.
It's both quite the same, but here we extend RocksDBTimestampedStore while the other extends RocksDBStore. Because Java does not support multi-inheritance, we cannot share and inherit code but need to duplicate it here.
Do you have an idea how we could avoid this?
Fix bug revealed by new test
| } | ||
| try { | ||
| final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch()); | ||
| segment.addToBatch(record, batch); |
There was a problem hiding this comment.
This is the bug fix -- in the original code, the record was added to the batch directly (ie, to default CF) -- we know delegate this to the segment that can decide to which CF the record is added.
|
|
||
| void addToBatch(final byte[] key, | ||
| final byte[] value, | ||
| final WriteBatch batch) throws RocksDBException; |
There was a problem hiding this comment.
We add this new method to RocksDBAccessor for the fix
| } else { | ||
| batch.put(columnFamily, entry.key.get(), entry.value); | ||
| } | ||
| addToBatch(entry.key.get(), entry.value, batch); |
There was a problem hiding this comment.
some refactoring to share more code exploiting the newly added method (similar below)
| import static org.junit.Assert.assertTrue; | ||
|
|
||
| @RunWith(Parameterized.class) | ||
| public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment> { |
There was a problem hiding this comment.
Refactored the test to reuse test code for both segments. It's the same test code as before, with some generic/abstract types/methods
|
Updated this. |
|
Ack. It's internal and we can refactor later if it turns out to be worth it. Merging as-is for now. |
* apache/trunk: KAFKA-8030: Fix flaky tests in TopicCommandWithAdminClientTest fix compile error for example (apache#6526) MINOR: Comment spelling nit MINOR: Optimize ConnectionStressWorker KAFKA-8034: Use automatic RPC generation in DeleteTopics MINOR: Move KTable source topic for changelog to optimization framework (apache#6500) KAFKA-7502: Cleanup KTable materialization logic in a single place (doMapValues) (apache#6520) Cleanup KTableImpl#doTransformValues (apache#6519) MINOR: Streams input topic corrected (apache#6513) MINOR: WorkerUtils#abort: fix bug in abort logic (apache#6516) KAFKA-7502: Cleanup KTable materialization logic in a single place (filter) (apache#6453) MINOR: Fix some spelling corrections in comments (apache#6507) KAFKA-3522: Add RocksDBTimestampedSegmentedBytesStore (apache#6186) MINOR: Add 2.2.0 upgrade instructions (apache#6501)
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Part of KIP-258.