KAFKA-8254: Pass Changelog as Topic in Suppress Serdes#6602
KAFKA-8254: Pass Changelog as Topic in Suppress Serdes#6602guozhangwang merged 8 commits intoapache:trunkfrom vvcephei:KAFKA-8254-fix-suppress-serdes
Conversation
|
@mjsax , Do you mind taking a look at this, since we were having a conversation about it? The cleanest way to implement the change is to push the serialization logic into the store, where the changelog topic name is known. This also brings this store into alignment with the other state stores, who present a typed interface to the processors that use them, and handle serialization internally. |
There was a problem hiding this comment.
This is the only potentially weird thing... Rather than the store setting its own serdes during store init from the context, I'm letting the processor set them during processor init.
The reason for this is that the store is generically typed, and therefore does not know that it needs to wrap the value serde in a Change serde. We could tell it that its value is a Change, but it would be unnatural to define the buffer as a Store<K, Change<V>>.
Thus, we give the responsibility to the Processor during initialization, after it resolves the store, to make sure that the serdes are set before using it. This is more natural, since the Processor already knows that the store's value is a Change, so it can confidently wrap the context's serde and hand it off to the store.
It's also safe, because we know that no other caller will interact with the store's typed methods except the Processor (e.g., the store is (and will always be) inaccessible to IQ).
There was a problem hiding this comment.
but it would be unnatural to define the buffer as a Store<K, Change>.
Not sure if I understand this argument. Can you elaborate?
There was a problem hiding this comment.
Sure; right now, the buffer is a generic k/v store with special time-dependent semantics. There's nothing in the buffer implementation that requires the value to be a specific type.
At the same time, we have to deal with this whole "try harder to set the serdes if they didn't get set to begin with" business. The suppress operator (not the buffer) happens to require a very specific serde, the FullChangeSerde, which is actually an internal class, so we're basically guaranteed to get the wrong serde and get a runtime exception if we just try to use whatever serde is defined in the context. But this isn't the buffer's problem, it's the suppress processor's problem. So, that's why I chose to add this method to let the operator deal with its own business and keep the buffer ignorant of the specific value type.
There was a problem hiding this comment.
While I was wrestling with this, I got some inspiration that can actually let us get rid of these runtime serde double-checks everywhere, but it's out of scope for this PR. Hopefully, we can remove this method later on, though, and just guarantee that the serdes are never null in all stores and operators.
There was a problem hiding this comment.
This is to fix a warning about the switch being incomplete.
There was a problem hiding this comment.
I added the Eviction class so we can pack up all three pieces (key, value, context) to give to the callback. We can't use ContextualRecord anymore, since it's a post-serialization container for the value.
There was a problem hiding this comment.
just some on-the-side cleanup
There was a problem hiding this comment.
resolving a warning about the raw types
|
Weird, core failed to compile... Retest this, please. |
|
Yep, trunk is broken at the moment. Submitted #6603 to fix it. |
|
rebased to pull in the fix |
|
Reported java 11 test failures: kafka.admin.DeleteConsumerGroupsTest.testDeleteCmdAllGroups Retest this, please. |
|
Test results are gone. Retest this, please. |
|
Rebased to fix merge conflicts. |
guozhangwang
left a comment
There was a problem hiding this comment.
The refactoring lgtm! I think it's a good idea to move the serde logic from processor into the store (only let the processor to set the serdes).
And also checked the serde calls with changelogTopic, and the fix lgtm too.
|
retest this please |
|
The tests have a compile error. I'll fix it while I'm adding the test I described above. |
|
Thanks @vvcephei , other than the above clarification question I do not have further comments. cc for another review @abbccdda @ableegoldman @bbejeck @cadonna |
vvcephei
left a comment
There was a problem hiding this comment.
@guozhangwang , can you take one more look? I ran into some surprising cases where serdes weren't propagated when I wrote my tests to validate the serde handling of Suppress. I want to make sure what I did in KStreamImpl is actually ok.
| } | ||
|
|
||
| public FullChangeSerde(final Serde<T> inner) { | ||
| private FullChangeSerde(final Serde<T> inner) { |
There was a problem hiding this comment.
Forcing all callers to use castOrWrap, which propagates null (aka, no serde is defined). See the usage in KTableImpl.
| private final long offset; | ||
| private final String topic; | ||
| private final int partition; | ||
| private final Headers headers; |
There was a problem hiding this comment.
some on-the-side cleanup of ProcessorRecordContext I encountered while writing my test.
| producedInternal.keySerde() != null ? producedInternal.keySerde() : keySerde, | ||
| producedInternal.valueSerde() != null ? producedInternal.valueSerde() : valSerde, | ||
| producedInternal.keySerde(), | ||
| producedInternal.valueSerde(), |
There was a problem hiding this comment.
Why this rewrite? (Just curious.)
There was a problem hiding this comment.
it's just no longer necessary, since we're already setting the key/value serdes if they were null in producedInternal on lines 395-400.
There was a problem hiding this comment.
Sure. The question was about both changes in combination. What is it better to split out compare to just do a one liner?
| /** | ||
| * Equality is implemented in support of tests, *not* for use in Hash collections, since this class is mutable. | ||
| */ | ||
| @Deprecated |
There was a problem hiding this comment.
No need to deprecate internal method. If we want to remove it, we should just do it.
There was a problem hiding this comment.
We can't remove hashCode, since it's defined by Object. Marking it as deprecated isn't necessary, since any attempt to use it results in an exception, but it's just extra documentation that hashCode is not to be used on ProcessorRecordContext.
I implemented it because implementing equals without hashCode is a bug, and a very subtle one. But I wanted equals in support of the test validation. And we also can't provide a correct implementation of hashCode, since there is mutable state in this object. This all basically amounts to a flaw in Java, that you can't define equality without implying that an object is stably hashable. Ideally, we could plug in a different notion of equality, but all the testing frameworks have built-in dependency on equals. Throwing the exception and adding a deprecation is essentially finding a middle ground, we wind up with safe code that can also be used in tests.
There was a problem hiding this comment.
What I meant by "remove" was, to remove the overwrite :)
Thanks to clarify why you want to overwrite and throw. But this does not mean that the method is deprecated... Using the annotation implies that we want to remove this overwrite in the future was is miss leading. I would prefer to remove the annotation. \cc @guozhangwang @bbejeck
There was a problem hiding this comment.
I think that throwing the exception in its override implementation is sufficient for future bug-proof, as regarding to documentation I feel neutral either way.
There was a problem hiding this comment.
We should still add JavaDocs and explain that we throw and why. But deprecation != documentation IMHO.
There was a problem hiding this comment.
but it would be unnatural to define the buffer as a Store<K, Change>.
Not sure if I understand this argument. Can you elaborate?
|
Thanks for the review, @mjsax ! I qualified the serdes, and I'll remember not to statically import them like that in the future. |
|
Thanks, @guozhangwang ! |
|
The cherry-pick to 2.2 was pretty smooth (1 conflict), but 2.1 has a lot of conflicts. If it's ok with you, @guozhangwang , I'll just send a PR for the 2.1 port. |
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
|
I've already cherry-picked to 2.2: as you said it was pretty trivial to resolve conflicts. |
|
Tried 2.1 and it was a bit of work.. could you send another PR? |
Cherry-picked from #6602 Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
* ak/trunk: (42 commits) KAFKA-8134: `linger.ms` must be a long KAFKA-7779; Avoid unnecessary loop iteration in leastLoadedNode (apache#6081) MINOR: Update Gradle to 5.4.1 and update its plugins (apache#6436) MINOR: improve Session expiration notice (apache#6618) KAFKA-8029: In memory session store (apache#6525) MINOR: In-memory stores cleanup (apache#6595) KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol (apache#6177) KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (apache#6602) KAFKA-7903: automatically generate OffsetCommitRequest (apache#6583) KAFKA-8291 : System test fix (apache#6637) MINOR: Do not log retriable offset commit exceptions as errors (apache#5904) MINOR: Fix log message error of loadTransactionMetadata (apache#6571) MINOR: Fix 404 security features links (apache#6634) MINOR: Remove an unnecessary character from broker's startup log MINOR: Make LogCleaner.shouldRetainRecord more readable (apache#6590) MINOR: Remove implicit return statement (apache#6629) KAFKA-8237; Untangle TopicDeleteManager and add test cases (apache#6588) KAFKA-8227 DOCS Fixed missing links duality of streams tables (apache#6625) MINOR: reformat settings.gradle to be more readable (apache#6621) MINOR: Correct RestServerTest formatting ... Conflicts: build.gradle settings.gradle
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Committer Checklist (excluded from commit message)