KAFKA-3522: Add TimestampedWindowStore builder/runtime classes#6173
Conversation
|
Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman |
06c3ace to
7f58b88
Compare
There was a problem hiding this comment.
Since the new classes are static maybe they could go in their own package along with the other pre-existing decorators? Probably not on this PR but in a follow-up.
There was a problem hiding this comment.
I am open to add new packages -- @vvcephei was suggestion this too (for example for RocksDB classes). I would prefer to do this a follow up PRs (it's internal anyway).
|
Retest this please |
There was a problem hiding this comment.
I'm sorry; getting a little cross-eyed here...
It looks like we're putting newValue and oldRawValue in a new FlushEntry instance here for the sole purpose of extracting them back out and discarding the FlushEntry object immediately following.
What am I missing here?
There was a problem hiding this comment.
discarding the FlushEntry object immediately following.
We don't discard it. It's use the in next lines:
if (flushEntry.value != null || flushEntry.oldValue != null) {
flushListener.apply(
key,
flushEntry.value,
flushEntry.oldValue,
entry.entry().context().timestamp());
}
Maybe I miss-understand your question?
There was a problem hiding this comment.
Yeah, that's what I meant. We put the new and old value in, and then immediately take them back out again, and never do anything else with the flushEntry. It just seems like we could skip the middle-man and directly deserialize the values and pass them to the flushListener.
But it's also no big deal... I was just curious if I missed something.
There was a problem hiding this comment.
Well. As you mentioned, we pass in objects into flushEntry() and it creates a FlushEntry with serialized data. To share code, and to apply the same patter for all stores, it makes sense IMHO to push the serialization work into flushEntry() method -- the caller should be agnostic if flushEntry modified the data or not.
2095f23 to
9d028c4
Compare
There was a problem hiding this comment.
init() is just moved from below.
There was a problem hiding this comment.
need to access this in sub-class when overwriting new intiStoreSerde() method
There was a problem hiding this comment.
note: we can reuse exiting CachingWindowStore (thanks to our refactoring)
9d028c4 to
c24cc3f
Compare
|
Rebased this. Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman |
c24cc3f to
b03a811
Compare
| if (valueAndTimestamp != null) { | ||
| changeLogger.logChange(key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp)); | ||
| } else { | ||
| changeLogger.logChange(key, null); |
There was a problem hiding this comment.
This is not specifically related to this line of code but when I read about this, I was thinking about some correlated topic: when we call kvStore.delete and sessionStore.remove, we will pass null as ValueAndTimestamp and hence the timestamp field will also be null. Whereas in windowStore we do not have a delete api, but users can call put(k, null) or put(k, null, timestamp) in which case the timestamp field will not be null actually. Hence there's a discrepancy between the first two and the latter one, right?
There was a problem hiding this comment.
WindowStore#put(k, null, timestamp) is more or less the same as put(k, null) (the later actually calls the former and used context.timestamp() as third parameter) -- note that the timestamp that is passed is the windowStartTimestamp (not the timestamp that is associated with the value for the timestamped store).
In fact, we plan to deprecate and remove put(k, null), because it does not make sense semantically, to put something into a window store without specifying the window start timestamp (and context.timestamp() is not a good default value). Thus, for put(k, null, timestamp) the provided timestamp would be ignored anyway.
Also note, that for timestamped-store, the call would be put(key, valueAndTimestamp, windowStartTimestamp) and the timestamp in the value is not related to the window-start-timestamp (that will be part of the key in the store).
I agree that calling put(key, null, timestamp) to delete a window would be weird. However, in the DSL we would never delete a window anyway and don't need this. Not sure if PAPI uses would want a proper remove(key) method.
There was a problem hiding this comment.
I am just adding my 2 cents here. I understand what you are saying @mjsax, but I think if we should have a follow-up PR to update the docs to fully explain the semantic operations and differences between (kvStore, sessionStore) and the windowStore.
There was a problem hiding this comment.
We have many gaps in store documentation, and I agree it would be worth to do a follow up for this.
I also think I understand now why WindowStore#put(k, v) was added in the first place. For stream-steam join, we use windowed stores (with "allow duplicates" enabled) to store each individual record and thus there is not really a notion of a window (we use it more like a key-value store that allows for time-range queried plus retention time), and we use the record timestamp as "window start timestamp" (ie, we can use this shortcut method). Still think, we should remove it though.
There was a problem hiding this comment.
Thanks for the explanation @mjsax , yes we were trying to "reuse" the window store for stream-stream windowed join, which, as an after-thought is not a very good design.
I think we do not need to make any code changes atm (as for doc changes maybe we can track those as a separate ticket / PR).
|
|
||
| verifyWindowedKeyValue(a.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1"); | ||
| verifyWindowedKeyValue(b.next(), new Windowed<>(keyB, new SessionWindow(0, 0)), "1"); | ||
| verifyWindowedKeyValue(a.next(), new Windowed<>(keyA, new SessionWindow(0, 0)), "1".getBytes()); |
There was a problem hiding this comment.
Hmm.. why move the getBytes() from internal function to all its callers?
4279841 to
458935f
Compare
|
Pushed one more commit to cleanup the code. |
|
Known flaky tests failed:
Retest this please. |
|
|
||
| checkThrowsUnsupportedOperation(store::flush, "flush()"); | ||
| checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L), 1L), "put()"); | ||
| checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L)), "put()"); |
There was a problem hiding this comment.
Super nit: maybe add a comment here this is for put(key, value) as it took a few seconds for me to realize the diff between this line and the one above. This comment is subjective though so feel free to ignore.
| if (valueAndTimestamp != null) { | ||
| changeLogger.logChange(key, rawValue(valueAndTimestamp), timestamp(valueAndTimestamp)); | ||
| } else { | ||
| changeLogger.logChange(key, null); |
There was a problem hiding this comment.
I am just adding my 2 cents here. I understand what you are saying @mjsax, but I think if we should have a follow-up PR to update the docs to fully explain the semantic operations and differences between (kvStore, sessionStore) and the windowStore.
| private final List<KeyValueIterator<Windowed<String>, Long>> iters = new ArrayList<>(7); | ||
| private final List<KeyValueIterator<Windowed<String>, ValueAndTimestamp<Long>>> timestampedIters = new ArrayList<>(7); | ||
| private WindowStoreIterator<Long> windowStoreIter; | ||
| private WindowStoreIterator<ValueAndTimestamp<Long>> timestampedWindowStoreIter; |
There was a problem hiding this comment.
nit: this only used in one line for an assertion, could we use something else?
There was a problem hiding this comment.
I merged it with windowStoreIter -- we "loose" generics and need to add a suppress warning, but with mocking we don't really have type safety anyway so I think that is fine.
dc73092 to
5a3613b
Compare
|
@mjsax Thanks for the updates! LGTM. |
|
Known flaky test failed: https://issues.apache.org/jira/browse/KAFKA-8059 Retest this please. |
|
Merged #6173 into trunk |
* warn-apache-kafka/trunk: (41 commits) MINOR: Avoid double null check in KStream#transform() (apache#6429) KAFKA-7944: Improve Suppress test coverage (apache#6382) KAFKA-3522: add missing guards for TimestampedXxxStore (apache#6356) MINOR: Change Trogdor agent's cleanup executor to a cached thread pool (apache#6309) KAFKA-7976; Update config before notifying controller of unclean leader update (apache#6426) KAFKA-7801: TopicCommand should not be able to alter transaction topic partition count KAFKA-8091; Wait for processor shutdown before testing removed listeners (apache#6425) MINOR: Update delete topics zk path in assertion error messages KAFKA-7939: Fix timing issue in KafkaAdminClientTest.testCreateTopicsRetryBackoff KAFKA-7922: Return authorized operations in Metadata request response (KIP-430 Part-2) MINOR: Print usage when parse fails during console producer MINOR: fix Scala compiler warning (apache#6417) KAFKA-7288; Fix check in SelectorTest to wait for no buffered bytes (apache#6415) KAFKA-8065: restore original input record timestamp in forward() (apache#6393) MINOR: cleanup deprectaion annotations (apache#6290) KAFKA-3522: Add TimestampedWindowStore builder/runtime classes (apache#6173) KAFKA-8069; Fix early expiration of offsets due to invalid loading of expire timestamp (apache#6401) KAFKA-8070: Increase consumer startup timeout in system tests (apache#6405) KAFKA-8040: Streams handle initTransactions timeout (apache#6372) KAFKA-7980 - Fix timing issue in SocketServerTest.testConnectionRateLimit (apache#6391) ...
…e#6173) Add TimestampedWindowStore builder/runtime classes Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>, John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
Part of KIP-258.
This PR adds