KAFKA-3522: Add TimestampedKeyValueStore builder/runtime classes#6152
Conversation
|
Call for review @guozhangwang @bbejeck @vvcephei |
|
Need to add missing unit tests, but this is blocked by #6151 atm. |
2703c3b to
fa29cd8
Compare
|
Code cleanup with suggestions from #6200. Can be reviewed but must be rebased before we can merge it. |
There was a problem hiding this comment.
That we need to implement this method, that either wraps the data as ValueAndTimestamp or that needs to duplicate code, might be an indicator to use TimestampedKeyValueStore only as a marker interface, but that it should not add this method. Similar for putIfAbsent(). This argument could be applied to window/session store too.
There was a problem hiding this comment.
It looks like this has become package-protected so that it can be accessed by subclasses, but because basically all the stores code is in one package, it has the effect of making this field visible and modifiable (since it's not final) to the entire stores module.
It would be better at least to keep the field private and add a package-protected getter.
There was a problem hiding this comment.
serdes is set in child class now -- need to keep as-is.
There was a problem hiding this comment.
Nevermind. Wrong class. Can be private again after refactoring.
fa29cd8 to
ef55ea9
Compare
There was a problem hiding this comment.
Introducing this new method -- this requires code changes in Window/Session-Caches stores (for both, there is no functional change, it's just refactoring)
There was a problem hiding this comment.
I'd like to propose we think again after merging 6191 whether this refactoring is worth-while: it creates a FlushEntry object for each key-value pair to flush, while in 6191 we are always trying to serialize bytes -> Windowed<Bytes> -> Windowed<K> for key, so the refactoring here may help less for code cleanness.
There was a problem hiding this comment.
Sure, let's revisit after #6191 is merged and I rebase this PR.
There was a problem hiding this comment.
Need to make package private, because it's set in child class (cf #initStoreSerde())
There was a problem hiding this comment.
👍 Thanks for the explanation.
|
Retest this please |
There was a problem hiding this comment.
nit: instead of having oldRawValue final what about initializing to null then get rid of the else branch? This is somewhat opinionated, but find the latter easier to read.
There was a problem hiding this comment.
I personally prefer using final -- immutability is a nice property. Not sure what @guozhangwang @vvcephei think.
There was a problem hiding this comment.
Yeah that's fine with me, just a minor point
There was a problem hiding this comment.
Also prefer to have final variables, and actually I prefer to have "complete" instead of "one-sided" conditionals, too. (See https://github.com/kubernetes/kubernetes/blob/ec2e767e59395376fa191d7c56a74f53936b7653/pkg/controller/volume/persistentvolume/pv_controller.go#L55 for a succinct justification of this style)
If you want to tighten it up, a ternary operator would do it.
There was a problem hiding this comment.
nit: same comment as above regarding final and else statement.
There was a problem hiding this comment.
nit: I thought for new tests with expected failures we used the convention of
try {
... some code with failure...
fail(...)
} catch (Exception expected) {
.....It's a minor point just asking and I'm with this as is.
There was a problem hiding this comment.
I think for single line tests, it's ok to use the annotation.
2646a87 to
c709075
Compare
There was a problem hiding this comment.
@guozhangwang @bbejeck @vvcephei @ableegoldman
It's unclear to me atm why we need this (cf. comment). Anyone?
There was a problem hiding this comment.
This is related to the bug fix that I have on #6191.
Thinking about this, I'd prefer we merge 6191 first since there are quite some overlapping logic with this one, and rebasing 6191 after merging 6152 could be a much larger effort than 6191 then 6152, wdyt?
|
Rebase this to resolve merge conflict. Call for final review @guozhangwang |
guozhangwang
left a comment
There was a problem hiding this comment.
LGTM overall. I left a meta comment about merging 6191 first and then rebase this one a bit while reconsidering whether do flushEntry refactoring.
There was a problem hiding this comment.
nit: call init(valueSerde) here?
There was a problem hiding this comment.
The logic of serde valueAndTimestamp is scattered in multiple places, like here and in ValueAndTimestampSerde. Could we just reuse the logic in the latter class and not duplicating the logic?
There was a problem hiding this comment.
I see the desire but it's a little tricky.
Here, we only have StateSerde<K,V> and there is no ValueAndTimestampSerde instance we could use. What we essentially need here, is to split a byte[] that contains value-and-timestamp into the value- and timestamp component. Also we need to deserialize both components.
For ChangeloggingTimestampedKeyValueStore, it's slightly different, as we need to split a byte[] into both components, too, but need the value-part as byte[] and only the timestamp is deserialized.
Thus, what we could do it, to add static methods to ValueAndTimestampSerde (or maybe ValueAndTimestampDeserializer because it's only about deserialization) to consolidate this logic. Something like:
byte[] valueFromValueAndTimestamp(byte[] rawValueAndTimestamp): returns raw value partlong timestampFromValueAndTimestamp(byte[] rawValueAndTimestamp): extracts timestamp and returns it as long
WDYT?
There was a problem hiding this comment.
Yeah, this sounds good to me. We don't have any obligation to stick with the normal Serde patterns, and it would be more efficient if we can decompose just the pieces we need when we don't need both. Regardless, it would be nice to consolidate the logic, and static methods on ValueAndTimestampSerde seems like the place to do it.
There was a problem hiding this comment.
I like it too, we can add both the static ser / deser functions for extracting bytes, splitting and deserializing / serializing separately etc to ValueAndTimestampSerde, or just put them as two set of static functions in the ValueAndTimestampSerializer / Deserializer, that the ones for serialization stays in the former, and vice versa.
There was a problem hiding this comment.
Ditto here, we can just reuse the logic of ValueAndTimestamp<byte[]>.
There was a problem hiding this comment.
I think I'm convinced that having added functions inside ValueAndTimestampSer .. here is a better approach.
There was a problem hiding this comment.
I'd like to propose we think again after merging 6191 whether this refactoring is worth-while: it creates a FlushEntry object for each key-value pair to flush, while in 6191 we are always trying to serialize bytes -> Windowed<Bytes> -> Windowed<K> for key, so the refactoring here may help less for code cleanness.
2ab0007 to
185321e
Compare
|
@guozhangwang @bbejeck @vvcephei @ableegoldman Rebases this. Call for final review and merging. @guozhangwang Seems there are some older open question -- please see my reply and let me know what you thing (can still address the comment about scattered code...) |
There was a problem hiding this comment.
Is it worth mentioning here that this is a marker interface to notify Streams that this store uses the new (with timestamp) binary schema?
(note, I guess it will make more sense once KAFKA-7918 is complete)
There was a problem hiding this comment.
This is not a marker interface (the marker interface is TimestampedBytesStore)
There was a problem hiding this comment.
I'm sorry, but I still don't understand the motivation for constructing these objects. Do you mind explaining again?
There was a problem hiding this comment.
Because CachingTimestampKeyValueStore need to overwrite flushEntry() (cf https://github.com/apache/kafka/pull/6152/files#diff-25a648e31db0d5090ce7c5c5d247cf71R38)
There was a problem hiding this comment.
I think @mjsax 's motivation is to centralize the logic for 1) deserializing old and new values, and 2) choosing the timestamp, upon flushing.
Personally I felt it is a bit overkill since we need to create a new class, and also creating a temporary object every time for holding these three values before flushing: take an example of CachingTimestampedKeyValueStore line 40, the fact that we actually do not take the passed-in timestamp field at all but only need to return the extracted valueTimestamp for flushing makes me feeling that this class is not necessary.
I've also reviewed the other PR (#6331) for deferring the deserialization from the caching store all together, and do that in flush listener only, but it is not very elegant either, and after tried several alternatives myself I felt probably keeping the deserialization logic in caching store for now is the "least bad option" we have at hand.
So I'd like to propose a final time to not using a FlushEntry class, but just inlining the logic of deserializing the old/new values, as well as choosing the timestamp to flush within each class, which is:
- deserializing logic of new / old values for KeyValue, Window, Session stores are the same.
- deserializing logic of new / old values for TimestampedKeyValue would be different (and potentially this logic can be centralized into a place when we have TimestampedWindow later).
As for which timestamp to flush, I actually do not know why we should use the deserialized timestamp from the cache rather than still use the timestamp field in the cache to flush? My understanding is that they should be the same right? In that case the logic of selecting the timestamp should all be the same.
There was a problem hiding this comment.
I prefer to optimize for sharing code and following OO principles. However, I want to point out, that I refactored #6331 and hope this new version helps to resolve the issue. If we can agree that moving the deserialization out of the caching layer is the right way, than this flushing logic will become quite simplified.
As for which timestamp to flush, I actually do not know why we should use the deserialized timestamp from the cache rather than still use the timestamp field in the cache to flush? My understanding is that they should be the same right? In that case the logic of selecting the timestamp should all be the same.
As commented here, they are not the same and we need to get the timestamp from the value; using the timestamp from the LRUCacheEntry would be incorrect: #6152 (comment)
There was a problem hiding this comment.
This seems not addressed? Line 122 below is inconsistent with this one.
There was a problem hiding this comment.
Yeah, this sounds good to me. We don't have any obligation to stick with the normal Serde patterns, and it would be more efficient if we can decompose just the pieces we need when we don't need both. Regardless, it would be nice to consolidate the logic, and static methods on ValueAndTimestampSerde seems like the place to do it.
There was a problem hiding this comment.
👍 Thanks for the explanation.
|
Java11 timed out (known issue): https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/19612/ Retest this please. |
|
Retest this please |
|
retest this please |
guozhangwang
left a comment
There was a problem hiding this comment.
Made another pass on the JIRA.
There was a problem hiding this comment.
This is still following the meta comment I had before:
It seems this function is used for slight different logics of deserializing the bytes into objects for different caching stores. And actually the logic should be the same for all non-timestamped stores, and different with timestamped-stores. So I'm wondering if it is better to just have two static function: one that will be used for all non-timestamped stores to deserialize a value into object with the serde, and one that will be used for all timestamped stores that takes an additional timestamp as the parameter that "merges" the two into the object. So by doing this we still centralized the logic into a single place, without needing to create a entry object every time, which looks like an overkill to me?
There was a problem hiding this comment.
I agree that we should clean this up. But this should be done in a follow up PR: In fact, all caching stores should be non-generic <Bytes,byte[]> stores and we should not pass any Serdes to them to begin with. I would like to exclude this cleanup from this PR though.
If you want, I can also do a cleanup PR first before we merge this and rebase base on the cleanup PR. To avoid messy code in trunk as intermediate. Thoughts?
There was a problem hiding this comment.
This seems not addressed? Line 122 below is inconsistent with this one.
There was a problem hiding this comment.
In the previous class we just call flushEntry.timestamp while in line 211 below we did not, is that intentional?
There was a problem hiding this comment.
Good catch! It should be flushEntry.timestamp here, too.
There was a problem hiding this comment.
I like it too, we can add both the static ser / deser functions for extracting bytes, splitting and deserializing / serializing separately etc to ValueAndTimestampSerde, or just put them as two set of static functions in the ValueAndTimestampSerializer / Deserializer, that the ones for serialization stays in the former, and vice versa.
There was a problem hiding this comment.
Although it seems not possible to avoid data copying (unless we use sun.unsafe..), I still think we can save allocating a new bytebuffer here but just use byte array copyOfRange or systemCopy directly. Wdyt?
There was a problem hiding this comment.
And it makes me thinking, would it worth adding another function in serializer classes that takes three parameters: byte array, offset, and length, so we can save the copying as well (since it is on the critical path I'd like to put on my paranoid hat again) :) ? cc @vvcephei as well.
There was a problem hiding this comment.
Just clarifying that the above comment is not for addressing in this PR, just brining it up that maybe worth considering in the near future.
There was a problem hiding this comment.
We need to allocate new byte arrays for any case. copyOfRange() creates a new array internally, and System.arraycopy required to pass in destination array. That was the reason why we agreed to use ByteBuffer throughout the whole code base because it's the most convenient API and we cannot avoid new byte arrays no matter what API we use.
There was a problem hiding this comment.
I think I'm convinced that having added functions inside ValueAndTimestampSer .. here is a better approach.
There was a problem hiding this comment.
This is related to the first comment in this PR: I think we can add the function in this class, and then this function can be re-written to:
final byte[] valueBytes = valueFrom(valueAndTimestamp);
final long timestamp = timestampFrom(valueAndTimestamp);
final V value = valueDeserializer.deserializ(topic, valueBytes);
return ValueAndTimestamp.make(value, timestamp);
Ditto for ser.
There was a problem hiding this comment.
I'm not fully understanding the boolean flag here: when would we call the constructor without a valueSerde and call init(valueSerde) later? I'm thinking:
- If users pass in a serde via
Materializedor viaStores, Streams could enforce them to always have thevalueSerdepassed in when constructing the valueAndTimestampSerde. - If user does not specify a serde, then Streams code internally could just always construct the valueAndTimestampSerde by passing the default serde as
valueSerdewhen constructing it (i.e. inMeteredTimestampedKeyValueStorewe can omit checkinginitialized).
There was a problem hiding this comment.
Ack. (I guess this is some left over after code refactoring that slipped.)
|
Updated this. One open question about order of refactoring remains. \cc @guozhangwang |
There was a problem hiding this comment.
Actually I do not understand why we need to use deserialized valueTimestamp here instead of the timestamp from the cache entry: should they be always the same?
There was a problem hiding this comment.
Both can be different.
LRUCacheEntry.timestamp() is the timestamp of the currently processed input record, and it's immutable. If the DSL computes a new timestamp for the output record (future work), this new timestamp is added into the value. Thus, on flush, we extract it from the value and set it in context.forward() to make it available to downstream processors.
There was a problem hiding this comment.
Thanks for the explanation, that makes sense.
There was a problem hiding this comment.
Note: with the lates rebase (ie, caching store rework), this logic is not part of this PR any longer, because it moves into TupleForwarder and will be added in a follow up PR.
952cbb9 to
fea07aa
Compare
|
Rebased and updates this. Call for review. |
guozhangwang
left a comment
There was a problem hiding this comment.
A couple minor comments. Please feel free to merge after jenkins.
| return true; | ||
| } | ||
|
|
||
| // @Override |
There was a problem hiding this comment.
Some cleanup needed? Ditto elsewhere.
| } | ||
|
|
||
| static long timestamp(final byte[] rawValueAndTimestamp) { | ||
| return LONG_DESERIALIZER.deserialize( |
There was a problem hiding this comment.
nit: this can be simplified to LONG_DESERIALIZER.deserialize(rawTimestamp(..)).
| if (!enableCaching) { | ||
| return inner; | ||
| } | ||
| return new CachingKeyValueStore(inner); |
There was a problem hiding this comment.
Note that after the rebase, we don't need a CachingTimestampedKeyValueStore any longer, but can reuse CachingKeyValueStore for "plain" and "timestamped" KV-store.
* apache/trunk: KAFKA-7880:Naming worker thread by task id (apache#6275) improve some logging statements (apache#6078) KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose KAFKA-7997: Use automatic RPC generation in SaslAuthenticate KAFKA-8002; Log dir reassignment stalls if future replica has different segment base offset (apache#6346) KAFKA-3522: Add TimestampedKeyValueStore builder/runtime classes (apache#6152) HOTFIX: add igore import to streams_upgrade_test MINOR: ConsumerNetworkClient does not need to send the remaining requests when the node is not ready (apache#6264) KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430 Part-1) KAFKA-7918: Inline generic parameters Pt. III: in-memory window store (apache#6328) KAFKA-8012; Ensure partitionStates have not been removed before truncating. (apache#6333) KAFKA-8011: Fix for race condition causing concurrent modification exception (apache#6338) KAFKA-7912: Support concurrent access in InMemoryKeyValueStore (apache#6336) MINOR: Skip quota check when replica is in sync (apache#6344) HOTFIX: Change header back to http instead of https to path license header test (apache#6347) MINOR: fix release.py script (apache#6317) MINOR: Remove types from caching stores (apache#6331) MINOR: Improve logging for alter log dirs (apache#6302) MINOR: state.cleanup.delay.ms default is 600,000 ms (10 minutes). (apache#6345) MINOR: disable Streams system test for broker upgrade/downgrade (apache#6341)
…che#6152) Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Part of KIP-258.
This is mostly internal changes, however depends on public interfaces that are added via #6151). This PR contains some duplication to #6151 to make it compile:
Also some refactoring for the three caching stores to allow fewer code for newly added session store. Similar for logging store.
The actual change of this PR is to add