MINOR: Remove types from caching stores#6331
Conversation
guozhangwang
left a comment
There was a problem hiding this comment.
I understand that the serdes used in the cached store is only for deserializations for flushing to listeners, but after trying out several ways while reviewing this PR I felt there is no elegant manner to get the deserialization elsewhere so that we can trim it from the caching store itself (I've tried TupleForwarder, ForwardCacheFlushListener).
Compared with the current proposal, I'd prefer still keeping the logic scattered in the caching stores itself.
|
@guozhangwang I found a better way to remove the types from caching stores and refactored this PR accordingly. Let me know what you think about this proposal. |
There was a problem hiding this comment.
We return true if the listener was registered, ie, if the "metered stores" wraps a "caching store".
There was a problem hiding this comment.
Clever! I tried to do this change a little while ago and got hung up on this point.
There was a problem hiding this comment.
Using CachedStateStore<byte[], byte[]> instead of CachedStateStore< Bytes, byte[]> because it seems cleaner overall (similar on other caching stores).
There was a problem hiding this comment.
Below is just some method reordering within the class -- no actual code change.
There was a problem hiding this comment.
Below is just some method reordering within the class -- no actual code change.
There was a problem hiding this comment.
Below is just some method reordering within the class -- no actual code change.
There was a problem hiding this comment.
Only moved close() and approximateNumEntries
There was a problem hiding this comment.
We know the wrapping order, thus, this is safe. Intercepting the callback below to deserialize.
There was a problem hiding this comment.
Meta: I like this approach slightly more than the previous one, and wondering if we can sort-of combine these two to make the logic more readable (and also the metered store lost the topic name info when calling serde which may not be acceptable):
- Metered store do not need to inherited CachedStateStore, instead inherit a SerdeStateStore interface which just expose the getter of the serdes (note we cannot use
serdessince it is only constructed at initialization, we have to usekeySerdeandvalueSerdeinstead`). - In TupleForwarder we get both the SerdeStateStore, and check if its wrapped store is a CachedStateStore, if yes call
cachedStore.setFlushListener(... listener.apply(serdeStore.keySerde().deser ...))-- i.e. basically the code below.
WDYT?
There was a problem hiding this comment.
and also the metered store lost the topic name info when calling serde which may not be acceptable
Fixed this part. Compare my other comments.
About your proposal: that is certainly possible, it's similar (or even the same?) to the original PR introducing TypedStore (your suggested SerdeStateStore is the same thing). So I am wondering what the difference to the original proposal would be -- seems you suggest to go back to the original one.
There was a problem hiding this comment.
At a high level, our store ecosystem looks like an onion. On the outside, we have a <K,V> store, and on the inside, we have a <bytes,bytes> store. All the layers in between have different responsibilities, like changelogging, caching, add metrics, etc.
The nice thing about this PR is that it gives us one clean layer that's responsible for the transition <K,V> <=> <bytes, bytes>. When we need to look at the de/serialization into/out-of the stores, we have exactly one place to look.
The prior code did mostly this, but to accommodate cache flushing in conjunction with the fact that the cache layer is below the transition from objects to bytes, we had to poke a hole in the onion and tell the caching layer (a bytes layer) how to deserialize. So, then there were two layers that independently know how to de/serialize, and the onion had a hole in it.
This idea to move the serialization out to the TupleForwarder is basically the same, but in the opposite direction. Again, there are two components that need to perform serialization (the serialization layer and the tuple forwarder), and again, we need to poke a hole in the onion so that the tuple forwarder can communicate directly with an inner layer.
It's not always practical to go for a "pure" design, but if readability is the goal, then it seems like we should try to avoid mixing layers, unwrapping layers, etc. as much as possible. To be fair, this is just my take on the situation.
There was a problem hiding this comment.
I also think that using two nested CacheFlushListener is better instead of "drilling" whole into the APIs. So I prefer to stay with the current design.
There was a problem hiding this comment.
Only moved close() and both findSessions() methods
There was a problem hiding this comment.
Below is just some method reordering within the class -- no actual code change.
There was a problem hiding this comment.
Below is just some method reordering within the class -- no actual code change.
There was a problem hiding this comment.
Only Java8 cleanup and code reformatting in this class
There was a problem hiding this comment.
replace flushed to reuse existing CachingKeyValueStoreTest.CacheFlushListenerStub
There was a problem hiding this comment.
Because we assume that all "metered stores" implement CachedStateStore we need to add it here, too.
There was a problem hiding this comment.
Sorry, I couldn't follow... How is this a "metered store"?
Was it necessary to add the interface to get some tests to work?
There was a problem hiding this comment.
It's not a metered store, but we assume a certain wrapping hierarchy and GenericInMemoryKeyValueStore is in the same hierarchy level as "metered stores" -- for this test case, there is no metered stores, but GenericInMemoryKeyValueStore takes the metered stores place in the hierarchy. So, yes, it required to get the test work.
There was a problem hiding this comment.
Just to confirm: this line and below do not have any logical changes just due to git diff messing the line numbers right?
There was a problem hiding this comment.
In caching store we are passing topic but here null, is that okay?
There was a problem hiding this comment.
This slipped. We can get the topic from serdes.topic(). Will push a fix.
There was a problem hiding this comment.
Meta: I like this approach slightly more than the previous one, and wondering if we can sort-of combine these two to make the logic more readable (and also the metered store lost the topic name info when calling serde which may not be acceptable):
- Metered store do not need to inherited CachedStateStore, instead inherit a SerdeStateStore interface which just expose the getter of the serdes (note we cannot use
serdessince it is only constructed at initialization, we have to usekeySerdeandvalueSerdeinstead`). - In TupleForwarder we get both the SerdeStateStore, and check if its wrapped store is a CachedStateStore, if yes call
cachedStore.setFlushListener(... listener.apply(serdeStore.keySerde().deser ...))-- i.e. basically the code below.
WDYT?
There was a problem hiding this comment.
Thanks, @mjsax I've made an initial pass and have a few minor comments.
EDIT: I meant to add I like the new approach and consider it an improvement.
I also have some follow-up questions.
Since we've pushed the deserialization up a level, unless I'm missing something, it seems we are now bound always to wrap CachedStateStore instances in a metered store. If that's not the case how will we deserialize from non-metered-wrapped caching stores?
Also, since this change is limited caching stores this PR will not affect streams if caching is disabled correct?
There was a problem hiding this comment.
nit: cachingEnable -> cachingEnabled
There was a problem hiding this comment.
nit: I'm wondering if just using would suffice (IMHO slightly easier to immediately grok the meaning).
if (!cachingEnable) {
context.forward(key, new Change<>(newValue, oldValue));
}There was a problem hiding this comment.
That would work, too. IMHO, it's easier to avoid negation. It's easier to read the code that way (personal taste). I also tried to use
if(cachingDisabled) {
context.forward(key, new Change<>(newValue, oldValue));
}
but cachingDisabled is also weird. If you insist, I can still update it.
There was a problem hiding this comment.
No, it's fine as is, my comment is subjective.
There was a problem hiding this comment.
The PR replaces a while loop with a single statement, does this mean Streams is guaranteed to only ever have one level of a WrappedStateStore?
There was a problem hiding this comment.
TupleForwarder is part of the DSL and thus we know the wrapping hierarchy, and hence, this should be save. We know that the caching store is either inside metered stores, or there is no caching store.
There was a problem hiding this comment.
Would it makes sense to relocate the problem of maybe-unwrapping to get a CachedStateStore into a static method in WrappedStateStore, like we did for the timestamped bytes stores ( https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java#L28 )?
There was a problem hiding this comment.
An alternative is to also make the decorators implement CachedStateStore and just pass through to the store they wrap. Then, we don't need to have branches like this.
In fact, we could just add implements CachedStateStore to WrappedStateStore with a default, pass-though implementation.
There was a problem hiding this comment.
Clever! I tried to do this change a little while ago and got hung up on this point.
There was a problem hiding this comment.
At a high level, our store ecosystem looks like an onion. On the outside, we have a <K,V> store, and on the inside, we have a <bytes,bytes> store. All the layers in between have different responsibilities, like changelogging, caching, add metrics, etc.
The nice thing about this PR is that it gives us one clean layer that's responsible for the transition <K,V> <=> <bytes, bytes>. When we need to look at the de/serialization into/out-of the stores, we have exactly one place to look.
The prior code did mostly this, but to accommodate cache flushing in conjunction with the fact that the cache layer is below the transition from objects to bytes, we had to poke a hole in the onion and tell the caching layer (a bytes layer) how to deserialize. So, then there were two layers that independently know how to de/serialize, and the onion had a hole in it.
This idea to move the serialization out to the TupleForwarder is basically the same, but in the opposite direction. Again, there are two components that need to perform serialization (the serialization layer and the tuple forwarder), and again, we need to poke a hole in the onion so that the tuple forwarder can communicate directly with an inner layer.
It's not always practical to go for a "pure" design, but if readability is the goal, then it seems like we should try to avoid mixing layers, unwrapping layers, etc. as much as possible. To be fair, this is just my take on the situation.
There was a problem hiding this comment.
Sorry, I couldn't follow... How is this a "metered store"?
Was it necessary to add the interface to get some tests to work?
Yes, but this is no limitation IMHO. Note, that
Correct. That is the whole idea of our store hierarchy. \cc @bbejeck |
67e1df9 to
5d95689
Compare
|
Updated this and address review comments. Also rebased to resolve conflicts. Added some new tests. Call for review again. Based on the feedback, it seems better to use the "nested listener" approach to keep a clear separation of concerns instead of "drilling" wholes in the store hierarchy layers. |
guozhangwang
left a comment
There was a problem hiding this comment.
Sounds good to me. Please feel free to merge after jenkins passed :)
* apache/trunk: KAFKA-7880:Naming worker thread by task id (apache#6275) improve some logging statements (apache#6078) KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose KAFKA-7997: Use automatic RPC generation in SaslAuthenticate KAFKA-8002; Log dir reassignment stalls if future replica has different segment base offset (apache#6346) KAFKA-3522: Add TimestampedKeyValueStore builder/runtime classes (apache#6152) HOTFIX: add igore import to streams_upgrade_test MINOR: ConsumerNetworkClient does not need to send the remaining requests when the node is not ready (apache#6264) KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430 Part-1) KAFKA-7918: Inline generic parameters Pt. III: in-memory window store (apache#6328) KAFKA-8012; Ensure partitionStates have not been removed before truncating. (apache#6333) KAFKA-8011: Fix for race condition causing concurrent modification exception (apache#6338) KAFKA-7912: Support concurrent access in InMemoryKeyValueStore (apache#6336) MINOR: Skip quota check when replica is in sync (apache#6344) HOTFIX: Change header back to http instead of https to path license header test (apache#6347) MINOR: fix release.py script (apache#6317) MINOR: Remove types from caching stores (apache#6331) MINOR: Improve logging for alter log dirs (apache#6302) MINOR: state.cleanup.delay.ms default is 600,000 ms (10 minutes). (apache#6345) MINOR: disable Streams system test for broker upgrade/downgrade (apache#6341)
* MINOR: remove types from caching stores * Github comments and rebased
Removes all type information (including Serdes) from caching stores and move this "one level up" (ie, into metered stores).
Impact:
<byte[],byte[]>on flushTupleForwardedregistered "typed"FlushListeneron "metered stores" now (not on the caching store), and "metered stores" registers abyte[]-FlushListeneron the caching store. When caching store evicts and call the listener, the "metered stores" deserializes the bytes, and fires the original "typed"FlushListener.This will unblock KIP-258 PR #6152 (ie, the open discussion about flushing...)
Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman
Committer Checklist (excluded from commit message)