KAFKA-3522: Add public interfaces for timestamped stores#6175
KAFKA-3522: Add public interfaces for timestamped stores#6175mjsax merged 3 commits intoapache:trunkfrom
Conversation
|
Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman |
There was a problem hiding this comment.
nit: add a the in front of {@link SessionStore} and {@code TimestampedSessionStore}
There was a problem hiding this comment.
I prefer a instead of the though :)
There was a problem hiding this comment.
null check here on valueAndTimestamp?
There was a problem hiding this comment.
since innerIterator is only used to create a new KeyValueIteratorFacade maybe inline this to
return new KeyValueIteratorFacade<>(inner.range(from, to))
Here and below as well
For Note, that we will use the Does this make sense? |
39005a2 to
0bbe174
Compare
|
Updated and rebased. |
There was a problem hiding this comment.
I don't think it matters too much due to erasure and all the casting, but I think the reason you were prompted to use <Object,Object> instead of <K,V> as the parameters is that you forgot the generic parameters on KeyValueStoreFacade (so they are both expanded to Object). I'm a little surprised there was no warning for the raw-type return value.
I didn't test this, but I think this is what you wanted:
| return new KeyValueStoreFacade((TimestampedKeyValueStore<Object, Object>) store); | |
| return new KeyValueStoreFacade<K, V>((TimestampedKeyValueStore<K, V>) store); |
(also below)
There was a problem hiding this comment.
Ack. You suggestion is almost correct. It should be:
return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>) store);
0bbe174 to
d6b2dc1
Compare
guozhangwang
left a comment
There was a problem hiding this comment.
Made a pass on this ticket.
There was a problem hiding this comment.
Should we also state that the returned timestamp may also be unknown even though the value is not null? Ditto below.
There was a problem hiding this comment.
This would only happen during upgrade. Not sure if it is necessary?
Also, I don't plan to put any restrictions on the timestamp, ie, a PAPI use could put whatever she wants (including negative timestamps -- this would only fail if the store is logged because Kafka Producer does not allow to set negative timestamps.
Thinking about this twice, I am wondering if we should add a check for negative timestamps? Note, that the XxxStoreFaced also uses -1 as default timestamp on put(key, value) -- we could also use 0 as default though.
Thoughts? \cc @bbejeck @vvcephei @ableegoldman
There was a problem hiding this comment.
We should probably emphasize the difference between the timestamps used in windowed key and the timestamps associated with the value here, as they will be very confusing to users. Ditto for window store below.
There was a problem hiding this comment.
Ack.
Note, I also updated the JavaDocs of WindowStore and SessionStore.
There was a problem hiding this comment.
Inside the impl classes, what happens if the timestamp falls out of the key's session range?
There was a problem hiding this comment.
Nothing. We don't restrict / tie the timestamp to the window-range for maximum flexibility (even if in the DSL, we would want this restriction -- but I think, the PAPI store API should not enforce this restriction).
There was a problem hiding this comment.
Inside the impl classes, what happens if the timestamp falls out of the key's window range?
There was a problem hiding this comment.
Meta comment: I'm not sure if it is necessary to maintain compatibility in the test-utils code.
If you have a strong opinion for keeping this, then at least let's add one more comment here that users are recommended to make their test code changes sooner since otherwise it may leads to bad test coverage (e.g. your code actually has a bug that would result in incorrect timestamps, but because of you'd never need to make code changes here you may never capture it).
There was a problem hiding this comment.
I don't see a difference in test-utils package compare to other code -- it's also public API and we should be compatible -- that was the overall purpose of adding a test-utils package.
Will extend the comment and a WARN log statement that the test code should be updated.
There was a problem hiding this comment.
👍 on preserving compatibility. Also consider that the tests are more useful if the test-driver is roughly analogous to Streams.
There was a problem hiding this comment.
Should we prepare a new list of key-valuetimstamp and call the inner.putAll here?
There was a problem hiding this comment.
This is more efficient. If we prepare a new list, we iterator over the list effectively twice (once here, and a second time in inner.putAll()). And this code is simple/straight forward, so I don't think we gain reduced code complexity / code reuse that justifies this overhead.
There was a problem hiding this comment.
QQ: why these three classes only need to be in test-utils but not in streams internals? Are they not going to be needed for upgrade at all?
There was a problem hiding this comment.
We don't need them for upgrading. It's just for test-utils backwards compatibility.
The upgrade at runtime happens within RockDBTimestampedStore and for this case, we only need the get() compatibility that returns -1 if no timestamp is available yet. We don't need any write path backwards compatibility.
There was a problem hiding this comment.
This is a meta comment: I just realized that we are effectively having a duplicate API for each put operation in the extended class, like:
TimestampedWindowStore#put(K, ValueAndTimestamp<V>, long);
and
TimestampedWindowStore#put(K, V, long, long);
similarly for other two classes' put operations. I understand the first one is inherited and the second one is for more efficient store access, since there is often no reason to create a ValueAndTimestamp object if I already have a V value and a long timestamp at hand in my code. This duplicate API still degraded the API cleanness a bit: for example, for a customized TimestampedXXStore I need to implement two duplicated APIs still.
I'm wondering if there is a good way to go around it. If there's no better way at least we should consider just explicitly extend e.g. put(K, ValueAndTimestamp<V>, long) and default impl to the other API, and also add javadocs indicating that they have exactly the same semantics, and users should consider use the other one instead.
WDYT @bbejeck @vvcephei @ableegoldman ?
There was a problem hiding this comment.
As discussed in person, I think adding a default impl that "unwraps" the ValueAndTimestamp parameter into plain V value and long timestamp that calls the other/plain put() method seems to be a good idea.
There was a problem hiding this comment.
Huh, I previously overlooked this. Why do we need these new methods in addition to the equivalent ones we already have? IIRC, we agreed on ValueAndTimestamp to begin with to avoid having to create new store interfaces entirely.
There was a problem hiding this comment.
As discussed and announced in the KIP. Removed those methods.
|
Updated JavaDocs of (timestamped)window/session store classes and |
a358a70 to
d5a323f
Compare
|
Updates this:
DO NOT MERGE YET The change to We also had an open discussion about removing |
|
Updates this according to announced KIP changes. |
There was a problem hiding this comment.
I'm not sure I follow the need for this log message. As I read it, the method does exactly what it claims to do. But maybe I'm just not being imaginative.
Can you explain why you think folks will be confused by the method's behavior?
There was a problem hiding this comment.
I don't think people would be confused. The idea is, to educate/remind DSL users to update their code, because we do the store switch "under the hood". Not sure if WARN is the right log level (or if we want to log this at all). It was an idea, do I added it for discussion. Thoughts?
There was a problem hiding this comment.
Btw: My plan was to add the same log message to IQ code (in a follow up PR that adds IQ -- in this PR, we only add the interfaces and store-types, but IQ is not fully implemented in this PR.)
There was a problem hiding this comment.
Instead of WARN maybe INFO as it's not "harmful" but a subtle reminder for DSL users to update.
There was a problem hiding this comment.
It seems like a minor discrepancy with the KafkaStreams#store (IQ) counterpart... The IQ method will give back a KeyValueStore<K, ValueAndTimestamp<V>>, but this gives back a TimestampedKeyValueStore<K,V>.
I think I like the former one better, since it is more generic.
Can we change the return types of the TestDriver methods to match the IQ returned store types? It's an update to the kip, but it seems like a trivially minor one. (same with the window and session stores)
There was a problem hiding this comment.
I see your point. However, for IQ we return read-only stores (in contrast to the test driver).
To avoid adding new interfaces, we simply return ReadOnlyXxxStore<K, ValueAndTimestamp<V>> in IQ. We would need to introduce ReadOnlyTimestampedXxxStore<K, V> interfaces otherwise.
For the test driver, it will if fact return a TimestampedKeyValueStore<K, V> though that can be casted to KeyValueStore<K, ValueAndTimestamp<V>>. We needed to return TimestampedKeyValueStore originally, because we added new methods (that are now removed).
I personally don't see a big difference between both. Because the return type in IQ is different anyway (read-only vs read-write). I also don't see a big advantage in "aligning" the return types -- as I don't see an "alignment" to begin with (it's not the same type). (If we want the alignment, we should not introduce new interface, but follow your suggestion.)
If @guozhangwang @bbejeck think we should change it to KeyValueStore<K, ValueAndTimestamp<V>>, I am open to do the change. Just don't see a big necessity from my personal point of view.
There was a problem hiding this comment.
@mjsax , Thanks for that explanation. That makes sense why IQ needs to return the more generic type.
I'd say I'm +0.7 for changing to the return type here to KeyValueStore<K, ValueAndTimestamp<V>>.
- The type is more descriptive for users about what they can expect from the store
- The marker interface only exists for internal handling of the store. It's conflating the purpose of the interface to use it both as a marker interface and as an actual type.
- No cast is necessary, since
KeyValueStore<K, ValueAndTimestamp<V>>is a supertype ofTimestampedKeyValueStore<K, V>. When we get theTimestampedKeyValueStore<K, V>, we can simply return/assign it as aKeyValueStore<K, ValueAndTimestamp<V>>. - (a little bit repetitive from point 2) There's no value to the TTD users in giving them an extra interface to handle. More interfaces means more API surface area, which incrementally increases the mental load of using Streams.
At the end of the day, I don't think it's super harmful, so if you or the others prefer to keep it as-is, it's fine. Since this is a public API, I think it pays to consider it now, though.
There was a problem hiding this comment.
Initially, I was in favor of aligning the return types, but now that I remember we return ReadOnlyxxx variants we can't truly align without additional changes and considering I'm not sure it buys us much I'm now in favor of leaving as is.
There was a problem hiding this comment.
It's not so much about making the store the same as IQ as it is about not using the marker interface as both a marker and a caller-facing type. IMHO, there's no reason to do it, and no actual benefit from doing it, but there are downsides...
There was a problem hiding this comment.
I'd support @vvcephei 's argument to be consistent with IQ.
vvcephei
left a comment
There was a problem hiding this comment.
Left a request on the TopologyTestDriver return types, but otherwise, this all looks good to me.
Thanks!
-John
There was a problem hiding this comment.
Instead of WARN maybe INFO as it's not "harmful" but a subtle reminder for DSL users to update.
There was a problem hiding this comment.
Initially, I was in favor of aligning the return types, but now that I remember we return ReadOnlyxxx variants we can't truly align without additional changes and considering I'm not sure it buys us much I'm now in favor of leaving as is.
|
@mjsax Thanks for the changes. I just reviewed them, and I'm still 👍 on this PR. |
c9c37fc to
3aeb311
Compare
|
Rebased and updated this. This PR does not add session stores any longer. Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman |
guozhangwang
left a comment
There was a problem hiding this comment.
Just a minor comment on the javadocs about windowed key explanation. Otherwise LGTM.
Please feel free to merge.
|
|
||
| /** | ||
| * Interface for storing the aggregated values of sessions | ||
| * Interface for storing the aggregated values of sessions. |
| /** | ||
| * Interface for storing the aggregated values of fixed-size time windows. | ||
| * <p> | ||
| * The key is internally represented as {@link Windowed Windowed<K>} that comprises the plain key |
There was a problem hiding this comment.
nit: internally represented -> in the form of, since it is not really some internal impl that's leaking here. Also I felt we can remove that comprises.. since it is captured in the javadoc of Windowed already.
| * A windowed store interface extending {@link StateStore}. | ||
| * Interface for storing the aggregated values of fixed-size time windows. | ||
| * <p> | ||
| * The key is internally represented as {@link Windowed Windowed<K>} that comprises the plain key |
There was a problem hiding this comment.
Ditto here. I'd rather update the javadoc of Windowed and just reference it from WindowStore / TimestampedWindowStore. Also I think we do not need to talk about impl key representation since it is internal.
Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Add public interfaces for KIP-258 (key-value and window-store).
TimestampedBytesStoreis omitted because it's covered via KAFKA-3522: Replace RecordConverter with TimestampedBytesStore #6204Storesis omitted, because we need to add corresponding store builder classed firstAdditionally, adds some internal "facade" classes to allow accessing TimestampedStores with non-timestamped interfaces. This is part of the backward compatibility story of the KIP.
ReadOnlyXxxFaceclassed are part of main module because they will be use for IQ backward compatibility story in a follow up PRTopologyTestDriverwe need to give read/write access and add corresponding read/write facade classes