KAFKA-6455: Session Aggregation should use window-end-time as record timestamp#6645
Conversation
There was a problem hiding this comment.
Replacing Stores.sessionStoreBuilder with a call to new SessionStoreBuilder to be able to set this newly introduced flag to true. It's not super clean, but I did not have a better idea to tackle this (more details below).
There was a problem hiding this comment.
IIUC, you do not want to expose the flag in the public API. To make it super clean, you would need an internal factory method somewhere that is called here. Also the public factory method in Stores would then need to call the internal factory method with the flag set to false.
There was a problem hiding this comment.
It's not exposed. Stores does set the flag hardcoded to false and SessionStoreBuilder is not part of public API. Still, it seems not clean for internal code. It basically leaks DSL into PAPI, but stores should be DSL agnostic...
There was a problem hiding this comment.
FWIW, I think calling a constructor is just as clean as calling a static factory method. The flag itself is a little suspicious (although it may be unavoidable), but (IMO) calling the constructor is uncontroversial.
There was a problem hiding this comment.
I thinks @mjsax 's original comment is regarding about passing this flag to the store builder, and hence to the caching store, whether it is the most elegant way.
I'm actually wondering if it is really necessary -- see my other comment below.
There was a problem hiding this comment.
By default, the new flag is to false for backward compatibility.
There was a problem hiding this comment.
We switch to the new semantics only if the SessionStore is used by the DSL, to preserve backward compatibility.
There was a problem hiding this comment.
Update the test to also compare the result record timestamp
There was a problem hiding this comment.
If we merge two sessions, we use the session-end-timestamp on delete for the smaller session now.
There was a problem hiding this comment.
Hmm... Does this say that k1@0/0 was both created and deleted at time 0?
There was a problem hiding this comment.
Maybe -- I was not really sure about this one. We never discussed how deletes should be handled. If we don't use the session-window end-timestamp, it seem we might reintroduce non-determinism -- not sure.
There was a problem hiding this comment.
This is a very tricky subject, but it seems like the deletes should "happen" at the same time as the update. This would be the window-end time of the final merged window. In that case, we should actually not do any mutation while we're merging, but instead collect all the stuff to delete, and delete it at the end, while we also issue the update. (Since we wouldn't know the timestamp to use until after the merging is done).
Actually, this would also let us fix https://issues.apache.org/jira/browse/KAFKA-8318 , and opens up the possibility of doing a bulk/batch update to the state store.
There was a problem hiding this comment.
Not sure if I understand the connection to 8318? Why do we need the merged-windowed end-timestamp to fix it? Also not sure how bulk/batch updates related?
Also curious was others think. \cc @guozhangwang @bbejeck @ableegoldman @cadonna @abbccdda
There was a problem hiding this comment.
Hmm, I'm going to be a PITA and change my mind on this. Sorry, @bbejeck ...
After some further reflection, here's what I'm thinking:
Session windows have this lifecycle: creation, multiple updates, and deletion. Creation has always taken place effectively at window-end time, since when a window is created, start==end==record.timestamp.
Previously updates to a window were just assigned the timestamp of the event that caused the update. Let's say you have a count aggregation, and you have some window [0,5] with a count of 2, and you get some more input events at times 5, 3, 4, then your sequence of result updates are 3 at time 5, 4 at time 3, and 5 at time 4. This is semantically problematic, because the timestamps tell you these results are "out of order" and that the "most recent" count is actually 3 :( .
What @mjsax is proposing here is to "pin" all updates to a window to window-end time. Then, the result updates for our example is just 3 at time 5, 4 at time 5, 5 at time 5. This is totally fine, since, in the case of identical timestamps, offset order is the tie-breaker. Therefore, the "most recent" count is (correctly) 5.
It's worth noting that in general, we get an equally correct sequence of window updates as long as the update times don't go backwards. Just save this thought for a minute.
Now, we come to deletes. Arguably, the delete is just another kind of update. I think this is where Matthias's head was originally at. The same logic applies, if the delete timestamp is equal to the update timestamps and the create timestamp, then offset order is the tie-breaker. Since we delete the window after the creation and updates, the final state of the window is (correctly) "deleted".
Unlike the create/updates, though, the delete is "caused" by an event with a timestamp after the window end time. This is what was tripping me up. It seems fine to report a window update time as the "high watermark" time of all the window updates so far (in the case of disordered events), but it seems weird to report a window update (in this case, the delete) time as "in the past", from the perspective of the event that caused it. That's why I was thinking that we should use the later timestamp, to indicate that the delete was caused at that later time. This is also correct from a time semantics POV, because it preserves the correct order, that the delete comes the create/updates.
So, this is the punchline, that both approaches result in correct time semantics. The only difference is that using the causing-event timestamp for deletes reflects the provenance of the delete more accurately, but I don't think this fact is actually useful for anything. Given that we're already pinning the create and update timestamps to all be the window-end time, I'm thinking we should go ahead and stick with Matthias's original proposal to go ahead and use the same timestamp for the "final" update (aka, the delete).
There was a problem hiding this comment.
@vvcephei Thanks for the detailed analysis! I think I agree with you and @mjsax on the approach, just to clarify my understanding further:
Today even if we are not merging two session windows, a single session window's update is treated as a delete followed by an update. I think is what https://issues.apache.org/jira/browse/KAFKA-8318 is reporting about..
Now the logic would become that we use the max(ts, window_end_time) for updates, hence:
- with an update whose ts is smaller than the current end-window AND larger than the current start-window, the window_start/window_end_time would not change in the update record as well as in the changelog. In this case, we can consider optimizing it by not doing the delete followed by an update (i.e. KAFKA-8318).
** but practically, with rare out-of-ordering data this would probably give very small perf boost, right?
-
with an update whose ts is larger than current window end time, OR smaller than the window start time, we would update it as an delete of the original record (hence the tombstone ts == the old end-time) and then followed by a put of the new record with the new start / end-time == this record's ts.
-
merging two windows is actually equal to updating the smaller window with a larger end time and updating the larger window with a smaller start time (of course, with a single record).
Is my understanding correct?
There was a problem hiding this comment.
Hey @guozhangwang , just getting back to this thread...
For 1, yes, I think this is the situation, and I agree with your conclusion (under the assumption that out-of-order data is actually rare)
For 2, almost... for "followed by a put of the new record with the new start / end-time == this record's ts", do you mean the new end-time of the window? That's what we will use as the update timestamp. Note, the new end-time of the window might be the same as the old one, which brings us back to KAFKA-8318.
For 3, I'm afraid I don't follow. Looking at the sequence of updates, it's equivalent to deleting both the original windows and then adding a new one that spans both (and is semantically the merge result). Is this what you meant?
There was a problem hiding this comment.
Sound correct. Only (3 - merging two window) should be two deletes (of the old windows) and one insert for the new merged window with [first-window-start-ts,second-window-end-ts].
** but practically, with rare out-of-ordering data this would probably give very small perf boost, right?
It's not about performance, but just annoying to see unnecessary tombstones IMHO.
There was a problem hiding this comment.
Yeah for 3) I mean the same as you guys, i.e. we are updating two windows by deleting the old records, but with only a new record. So logically it was like "new window replace old window1" and also "new window replace old window2".
For 2), the end/start-time of the window will only be the same as the old one if the update record's ts < end-time and ts > start-time. I think we are on the same page @vvcephei
|
Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman @cadonna @abbccdda I am not 100% sure if the test coverage is sufficient. Please let me know if you think that more test are required. |
|
Created ticket for failing test. Java11 passed. Retest this please. |
There was a problem hiding this comment.
IIUC, you do not want to expose the flag in the public API. To make it super clean, you would need an internal factory method somewhere that is called here. Also the public factory method in Stores would then need to call the internal factory method with the flag set to false.
There was a problem hiding this comment.
If you set the flag to true here, you only test the DSL mode of the store, right? The processor API mode is not tested. I think you should test both modes. You could verify the expected timestamps for each mode in two separate test methods with each method instantiating the respective store. For all other expected results you could use the same test methods and run them twice, once with a store in DSL mode and once with the store in Processor API mode.
There was a problem hiding this comment.
Could we use if/else if/else for this part of the logic? It could be more reader-friendly.
There was a problem hiding this comment.
Not sure what you mean? Btw this code is generated by IntelliJ
There was a problem hiding this comment.
I think it's improves the style. It's internal only anyway.
There was a problem hiding this comment.
Potentially better to put this after L59 because dslUsage is ranked later of cacheFuncion in parameter list, and later of segmentInterval in argument list.
There was a problem hiding this comment.
Shall we reuse entry.entry().context() on L94 here, instead of calling it twice for timestamp()?
|
Updated this. |
c707130 to
e676ed0
Compare
|
Rebase this to resolve merge conflicts |
There was a problem hiding this comment.
FWIW, I think calling a constructor is just as clean as calling a static factory method. The flag itself is a little suspicious (although it may be unavoidable), but (IMO) calling the constructor is uncontroversial.
There was a problem hiding this comment.
This is unsafe with protected, non-final fields. Can you just throw an UnsupportedOperationException instead, since you really just need equals for testing?
There was a problem hiding this comment.
Needed to add this back, because we use it for testing now... Thoughts?
There was a problem hiding this comment.
We use To in hash-collections in our tests? I couldn't find where that happens.
There was a problem hiding this comment.
We don't use it explicitly, but it's required for mocking. Eg. https://github.com/apache/kafka/pull/6645/files#diff-2eb683696aa96820098ed11941833ee3R36
There was a problem hiding this comment.
Thanks for the context. I think that mock actually only depends on Equals. At least, I replaced this with throw new UnsupportedOperationException();, and the test still passes for me.
There was a problem hiding this comment.
Note that it would be equally safe (and maybe a little more intuitive) to just make the fields final. This results in a bigger change, though, because it needs changes in ProcessorContextImpl. That might not be a bad thing.
It seems like the To class is mutable primarily so that ProcessorContextImpl can maintain an immutable reference to it. But there's really no increase in safety between an immutable reference to a mutable object vs a mutable reference to an immutable object. Arguably, the latter is a little better because To is a data container (so immutability is best), whereas ProcessorContextImpl is a full-blown behavioral object that already has a bunch of mutable references to things.
There was a problem hiding this comment.
Ack about throwing an exception.
I am open to the other refactoring, but I won't do it in this PR. Feel free to do a MINOR PR directly or create a ticket. I just don't want to convolute this PR too much. Is this ok with you?
There was a problem hiding this comment.
Absolutely, I was just offering an alternative to the exception.
There was a problem hiding this comment.
dslUsage doesn't seem like the right name for this. It seems like "what it does" is more important than "what it's for". In this case, it controls whether the window-end timestamp is used when flushing, instead of the last update timestamp. Maybe forwardWindowEndTimestamp?
There was a problem hiding this comment.
should we also set the context timestamp to bytesKey.window().end()? It's a little confusing, since the flushed data seems to be partly influenced by the context, and partly influenced by the arguments to flushListener.
There was a problem hiding this comment.
I see what you are saying. Atm, it does not make a difference, because the FlushListener calls:
context.forward(key, new Change<>(newValue, oldValue), To.all().withTimestamp(timestamp));
and this will set the context timestamp to the window-end timestamp before calling downstream processors.
As a matter of fact, I have the suspicion that we can actually remove the timestamp argument on the FlushListener -- it was added as part of KIP-258, but after some refactoring of our stores, I think we can change it back (my idea was to hold back until KIP-258 is finished). For this case, the problem resolves naturally. Would it be ok if we clean this up as a follow up (to be sure we can remove the timestamp parameter again)?
There was a problem hiding this comment.
Would it be ok if we clean this up as a follow up (to be sure we can remove the timestamp parameter again)?
Of course! Thanks for looking at it.
There was a problem hiding this comment.
Hmm... Does this say that k1@0/0 was both created and deleted at time 0?
|
Updated this. |
|
Java11 failed. Tracked in Jira. Java8 passed. Retest this please. |
guozhangwang
left a comment
There was a problem hiding this comment.
I've had a meta question about the boolean flag, otherwise, lgtm.
There was a problem hiding this comment.
I'm wondering if we have to pass in this boolean flag to CachingSessionStore or not; my understanding is that for caching session stores, whenever it has a flush listener and that listener is called, we should always use the window end timestamp, right?
There was a problem hiding this comment.
For the DSL yes, but not for the general case. PAPI uses should be allowed to define their own semantics IMHO.
There was a problem hiding this comment.
If a PAPI user 1) adds a store, and then 2) cast that store to a CachedStateStore and call setFlushListener, then when that listener is called, the timestamp is passed in and is not be controllable by the user still, right? Or how the users could specify which timestamp to pass in today?
There was a problem hiding this comment.
Thinking about this once more, I actually believe we don't need this flag and we can push all the logic into the flush-listener. Let me update this PR and we can discuss afterwards.
There was a problem hiding this comment.
I thinks @mjsax 's original comment is regarding about passing this flag to the store builder, and hence to the caching store, whether it is the most elegant way.
I'm actually wondering if it is really necessary -- see my other comment below.
There was a problem hiding this comment.
This class must be immutable -- otherwise, using context.forward(..., To.) in combination with suppress() breaks, because suppress() buffers a reference to the context and assumes it's immutable.
There was a problem hiding this comment.
Add a test for out-of-order data to make sure we set the correct timestamp
There was a problem hiding this comment.
Add test for out-of-order data to make sure we set the correct timestamp
There was a problem hiding this comment.
This class did not have any testing.
There was a problem hiding this comment.
Ayayay. Thanks for adding it!
There was a problem hiding this comment.
Updating this test to also check the timestamp -- also add out-of-order record to the test case.
There was a problem hiding this comment.
Extend this test for caching and non-caching (to cover SessionTupleForwarder and SessionCachFlushListener code path).
Also update this test to check the result timestamps
There was a problem hiding this comment.
Update this test to check result timestamp, too.
|
Refactored this, and remove the "annoying" flag from the builder classes. Also updated couple of test cases -- one test case exposes a bug about mutable RecordContext -- I think, we don't need to backport this fix, because it's only a problem is |
vvcephei
left a comment
There was a problem hiding this comment.
Thanks for the update. Nice work on getting rid of that flag!
There was a problem hiding this comment.
This class is unsafe for hashing, because Headers is mutable. Do you need to store it in a hash-collection?
There was a problem hiding this comment.
Ah. I forgot about headers. Don't think we need it. Will revert it.
There was a problem hiding this comment.
With the new flusher/forwarder, do we need this still, or can we just roll this part of the diff back completely?
There was a problem hiding this comment.
We use To in hash-collections in our tests? I couldn't find where that happens.
There was a problem hiding this comment.
Bumping this conversation...
bbejeck
left a comment
There was a problem hiding this comment.
Chimed in on the question of delete timestamps, otherwise LGTM.
There was a problem hiding this comment.
I inclined to agree that it seems the delete should happen at the same time as the update, meaning that we use the timestamp when the delete action occurs, but I could be wrong.
|
failures seem related |
|
Ack. Race condition... forgot to increase the |
|
Updates this. |
There was a problem hiding this comment.
@vvcephei I put this fix to make ProcessorRecordContext immutable -- however, after you comment about headers being mutable, I am wondering if this is an issue we need to address here or not. Maybe not, because it's a general issue and the users responsibility to deep-copy headers is they are modified. Just wanted to double check and point it out.
There was a problem hiding this comment.
Ok. If a test does not pass, this method is actually called. Hence., if we don't implement it, we don't get a "yellow" test failure with proper error message, but the test crashed "red". \cc @vvcephei
|
Updated this with some minor cleanups.. |
e01a98e to
aebb1cd
Compare
There was a problem hiding this comment.
Similar to #6667 -- we should obey sendOldValues.
There was a problem hiding this comment.
Some test cleanup to get rid or ResultCollector processor but reuse MockProcessor instead
guozhangwang
left a comment
There was a problem hiding this comment.
Made another pass on the latest three commits, and it LGTM.
bbejeck
left a comment
There was a problem hiding this comment.
Took a look at latest commits, LGTM
aebb1cd to
4387329
Compare
|
Rebased to resolve merge conflicts. Removed unused classes |
|
Merged to trunk. Thanks @mjsax |
…es-14-May * AK_REPO/trunk: (24 commits) KAFKA-7321: Add a Maximum Log Compaction Lag (KIP-354) (apache#6009) KAFKA-8335; Clean empty batches when sequence numbers are reused (apache#6715) KAFKA-6455: Session Aggregation should use window-end-time as record timestamp (apache#6645) KAFKA-6521: Use timestamped stores for KTables (apache#6667) [MINOR] Consolidate in-memory/rocksdb unit tests for window & session store (apache#6677) MINOR: Include StickyAssignor in system tests (apache#5223) KAFKA-7633: Allow Kafka Connect to access internal topics without cluster ACLs (apache#5918) MINOR: Align KTableAgg and KTableReduce (apache#6712) MINOR: Fix code section formatting in TROGDOR.md (apache#6720) MINOR: Remove unnecessary OptionParser#accepts method call from PreferredReplicaLeaderElectionCommand (apache#6710) KAFKA-8352 : Fix Connect System test failure 404 Not Found (apache#6713) KAFKA-8348: Fix KafkaStreams JavaDocs (apache#6707) MINOR: Add missing option for running vagrant-up.sh with AWS to vagrant/README.md KAFKA-8344; Fix vagrant-up.sh to work with AWS properly MINOR: docs typo in '--zookeeper myhost:2181--execute' MINOR: Remove header and key/value converter config value logging (apache#6660) KAFKA-8231: Expansion of ConnectClusterState interface (apache#6584) KAFKA-8324: Add close() method to RocksDBConfigSetter (apache#6697) KAFKA-6789; Handle retriable group errors in AdminClient API (apache#5578) KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala ...
…timestamp (apache#6645) For session-windows, the result record should have the window-end timestamp as record timestamp. Rebased to resolve merge conflicts. Removed unused classes TupleForwarder and ForwardingCacheFlushListener (replace with TimestampedTupleForwarder, SessionTupleForwarder, TimestampedCacheFlushListerner, and SessionCacheFlushListener) Reviewers: John Roesler <john@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Boyang Chen <boyang@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
For session-windows, the result record should have the window-end timestamp as record timestamp.