MINOR: clarify why suppress can sometimes drop tombstones#6195
MINOR: clarify why suppress can sometimes drop tombstones#6195guozhangwang merged 2 commits intoapache:trunkfrom vvcephei:minor-clarify-suppression-dropping-tombstones
Conversation
|
A contributor asked me what Please take a look when you get the chance, @guozhangwang @mjsax @bbejeck @ableegoldman |
|
Retest this please. |
| @@ -66,7 +79,7 @@ Duration timeToWaitForMoreEvents() { | |||
| } | |||
|
|
|||
| boolean shouldSuppressTombstones() { | |||
There was a problem hiding this comment.
Consider renaming to safeToDropTombstones:
| boolean shouldSuppressTombstones() { | |
| boolean safeToDropTombstones() { |
There was a problem hiding this comment.
Ah, missed one. Thanks!
|
retest this, please |
guozhangwang
left a comment
There was a problem hiding this comment.
LGTM. Left a meta comment which is actually out of the scope of this PR.
| * idempotent and correct). We decided that the unnecessary tombstones would not be | ||
| * desirable in the output stream, though, hence the ability to drop them. | ||
| * | ||
| * A alternative is to remember whether a result has previously been emitted |
There was a problem hiding this comment.
Sounds good.
Actually I've seen the same situation for our current caching layer flushing logic as well: e.g. put(A, v) -> delete(A) and both only hit the cache layer. When flushing we tried to read the old value and found its null bytes, so we know nothing was flushed for A and nothing written to downstream before so we can skip putting a tombstone to underlying store as well as downstream.
For suppression buffer though, it is harder since you do not have an underlying store to fetch the old value, and of course reading the whole changelog to see if there's any updates on this key A costs you everything. But suppose we always have a persistent buffer, this may be an easier task.
There was a problem hiding this comment.
Thanks @guozhangwang ,
Yes, I think that's why the record cache keeps a set of "dirty keys", just like the suppression buffer. If you have a dirty-key, but don't find any value for it, then you know it was a delete and you can send a tombstone.
It is indeed harder for the suppression buffer, since we try not to completely duplicate all the data. I guess it's not all the data, but it is all the keys at least. For example, let's say we store the data as the heterogeneous type [nonTombstoneValueHasBeenEmittedFlag] | [nonTombstoneValueHasBeenEmittedFlag, valueToBeEmitted]. Then, when we get a tombstone, if the pre-existing value has nonTombstoneValueHasBeenEmittedFlag == true, we know we must emit the tombstone. If there is no value, or if nonTombstoneValueHasBeenEmittedFlag == false, we know that we don't need to send the tombstone. Upon sending the tombstone, we would simply delete the record from the store. Upon emitting a non-tombstone value, we would drop the value from the store and only store [nonTombstoneValueHasBeenEmittedFlag := true].
Not sure I would do this in memory, but as you point out, it could be an option for the persistent version. I guess if you think the whole "live" key space would fit in memory (plus one bit per key for the flag), then in-memory would work too.
There was a problem hiding this comment.
Right. But note that the current dirty-key in cache is not enough determining if we have, ever, write for a key to the underlying store which is not deleted yet: dirty-key only contains the dirty-keys since last flush, i.e. the key not in the dirty-key is only a necessary, not sufficient condition. And that's why we can only consider not writing the tombstone if the read on this key returns null-bytes, indicating nothing was there.
There was a problem hiding this comment.
Oh, right, I guess we would need to do something like what I described above, or some equivalent solution...
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the clarification, LGTM
|
Some core integration test failed... Retest this, please. |
|
I've ran the unit tests locally and it passed. |
|
Thanks, @guozhangwang ! |
* ak/trunk: MINOR: Update usage of deprecated API (apache#6146) KAFKA-4217: Add KStream.flatTransform (apache#5273) MINOR: Update Gradle to 5.1.1 (apache#6160) KAFKA-3522: Generalize Segments (apache#6170) Added quotes around the class path (apache#4469) KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (apache#6202) MINOR: In the MetadataResponse schema, ignorable should be a boolean KAFKA-7838: Log leader and follower end offsets when shrinking ISR (apache#6168) KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… (apache#3848) MINOR: clarify why suppress can sometimes drop tombstones (apache#6195) MINOR: Upgrade ducktape to 0.7.5 (apache#6197) MINOR: Improve IntegrationTestUtils documentation (apache#5664) MINOR: upgrade to jdk8 8u202 KAFKA-7693; Fix SequenceNumber overflow in producer (apache#5989) KAFKA-7692; Fix ProducerStateManager SequenceNumber overflow (apache#5990) MINOR: update copyright year in the NOTICE file. (apache#6196) KAFKA-7793: Improve the Trogdor command line. (apache#6133)
Reviewers: Jonathan Gordon, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Clarify when and why suppress would drop tombstone records. No behavioral change.
Committer Checklist (excluded from commit message)