[WIP: DO NOT MERGE] KAFKA-3522: Allow storing timestamps in RocksDB#6044
[WIP: DO NOT MERGE] KAFKA-3522: Allow storing timestamps in RocksDB#6044mjsax wants to merge 2 commits intoapache:trunkfrom
Conversation
| * @param aggregate the aggregated value for the session, it can be {@code null}; | ||
| * if the serialized bytes are also {@code null} it is interpreted as deletes | ||
| * @param timestamp the timestamp for the session; ignored if {@code aggregate} is {@code null} | ||
| * @throws NullPointerException If null is used for sessionKey. |
There was a problem hiding this comment.
I don't think you need to document runtime exceptions like this
There was a problem hiding this comment.
It common practice in Kafka (at least for Streams API) to document those. Some API accept null and others don't. It would be bad user experience to find out via trial-and-error IMHO.
| * @param <K> Type of keys | ||
| * @param <V> Type of values | ||
| */ | ||
| public interface WindowWithTimestampStore<K, V> extends WindowStore<K, ValueAndTimestamp<V>> { |
There was a problem hiding this comment.
Hard to parse this class name -- sounds like "trait Window with TimestampStore" instead of WindowStore<... ValueAndTimestamp...>
Maybe TimestampWindowStore or TimestampedWindowStore? shrug.
There was a problem hiding this comment.
Thanks for the suggestion. Please follow up on the DISCUSS thread on the dev mailing list with this. All public API changes should be discussed there.
| */ | ||
| class Segments { | ||
| private static final Logger log = LoggerFactory.getLogger(Segments.class); | ||
| abstract class AbstractSegments<S extends Segment> { |
There was a problem hiding this comment.
recommend sticking with T, U, V (or A, B, C for higher-kinded) type parameters
| import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; | ||
| import org.apache.kafka.streams.state.KeyValueStore; | ||
|
|
||
| class CachingKeyValueWithTimestampStore<K, V> extends CachingKeyValueStore<K, V> { |
There was a problem hiding this comment.
same here -- sounds like CachingKeyValue with TimestampStore
| super(underlying, keySerde, valueSerde); | ||
| } | ||
|
|
||
| void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { |
There was a problem hiding this comment.
large and deeply nested method here -- recommend splitting into multiple smaller private methods where possible
| * Simple wrapper around a {@link SegmentedBytesStore} to support writing | ||
| * updates to a changelog | ||
| */ | ||
| class ChangeLoggingWindowWithTimestampBytesStore extends ChangeLoggingWindowBytesStore { |
There was a problem hiding this comment.
To avoid these very long class names, consider adding a factory method like ChangeLoggingWindowByteStore.newWithTimestamp() or something
| */ | ||
| class ChangeLoggingWindowWithTimestampBytesStore extends ChangeLoggingWindowBytesStore { | ||
|
|
||
| ChangeLoggingWindowWithTimestampBytesStore(final WindowStore<Bytes, byte[]> bytesStore, |
There was a problem hiding this comment.
since this wrapper just exposes a new constructor, consider making it a factory method instead
| * @param <K> The key type | ||
| * @param <V> The value type | ||
| */ | ||
| public interface KeyValueWithTimestampStore<K, V> extends KeyValueStore<K, ValueAndTimestamp<V>> { |
There was a problem hiding this comment.
possible alternative: implement KeyValueStore<K, V>, and then expose an additional putWithTimestamp, getWithTimestamp etc for callers that want ValueAndTimestamp<V> instead of V. This would probably require fewer code changes elsewhere.
| }, | ||
| restoreTime); | ||
| } else { | ||
| inner.init(context, root); |
There was a problem hiding this comment.
this pattern of if (shouldRecord) measureLatency(X) else X is not very DRY. You have this same condition (shouldRecord) in several places, and the code for measureLatency(X) vs X is essentially copy-pasted.
Instead, add maybeMeasureLatency(f, sensor) with if (sensor.shouldRecord()) ... .
| } | ||
| } | ||
|
|
||
| private interface Action<V> { |
There was a problem hiding this comment.
can this just be Producer<ValueAndTimestamp<V>> ?
No description provided.