MINOR: fix bypasses in ChangeLogging stores#6266
MINOR: fix bypasses in ChangeLogging stores#6266bbejeck merged 2 commits intoapache:trunkfrom vvcephei:fix-putall-bypass
Conversation
|
Hey @ableegoldman @guozhangwang @mjsax @bbejeck , If some of you have a minute, can you take a quick look at this? I happened to notice during some other refactoring that the change-logging store layer sometimes bypasses the underlying store and instead calls across to a different layer. It seems unexpected that it should do so, and it might actually cause problems. There was one spot where it's impossible to avoid it (in the windowed store), but I added a note justifying why we bypass the underlying store. Thanks, |
There was a problem hiding this comment.
This change would make it impossible for underlying stores to implement an atomic update. At the least, it prohibits a performance optimization if the underlying store supports putIfAbsent natively.
It could cause concurrency bugs, if the underlying store were attempting to provide thread-safe access via IQ. I'm not too sure about this, though.
There was a problem hiding this comment.
The prior logic splits one call (putIfAbsent) into two separate operations (get, followed by put). The underlying store has its own putIfAbsent method, which we would never call through to.
There was a problem hiding this comment.
It's unexpected for a "pass-though" layer to actually change the behavior and semantics of an operation.
There was a problem hiding this comment.
Well, it's single threaded so it does not really matter too much. I understand that it is a change, but you mentioned
It could cause concurrency bugs, if the underlying store were attempting to provide thread-safe access via IQ. I'm not too sure about this, though.
that why I was asking. This was not an issue in existing code -- I guess the code was written this way to "share" the part that does the actually changelog.logChange() call
There was a problem hiding this comment.
Thus, I agree that the change makes sense -- it was just your comment that confused me.
There was a problem hiding this comment.
Yes, I agree. It also looked to me like the intent was to share the logging portion of the put call. Normally, this would be ideal, but in this case it seems to violate the expectations of store implementers.
It is true that Streams's processing is single-threaded. I should have been more explicit about the nature of the concurrency behavior I had in mind... Specifically, I was thinking about concurrent read access via IQ, but in retrospect, since IQ users don't have any visibility into the exact state of record processing, the concurrent read story should be identical whether Streams performs a single atomic operation, or two separate ones.
Therefore, I guess the concurrency thing is a non-issue. Sorry for the confusion.
There was a problem hiding this comment.
In these cases, we're bypassing the underlying store for apparently no reason at all. It's not unreasonable to think that the underlying store could support more efficient versions of these operations.
There was a problem hiding this comment.
In this case, as the comment explains, calling across to the other method is the best option for us. Really, it just demonstrates that this method should be deprecated (https://issues.apache.org/jira/browse/KAFKA-7928), at which point the problem goes away.
|
Good catch, LGTM |
|
some of the test failures are related. |
|
Thanks, @bbejeck , I fixed it. |
|
Rebased to resolve merge conflict. |
mjsax
left a comment
There was a problem hiding this comment.
LGTM (assuming Jenkins passes).
|
LGTM! |
|
Thanks, all! |
* ak/trunk: (45 commits) KAFKA-7487: DumpLogSegments misreports offset mismatches (apache#5756) MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (apache#6269) KAFKA-7935: UNSUPPORTED_COMPRESSION_TYPE if ReplicaManager.getLogConfig returns None (apache#6274) KAFKA-7895: Fix stream-time reckoning for suppress (apache#6278) KAFKA-6569: Move OffsetIndex/TimeIndex logger to companion object (apache#4586) MINOR: add log indicating the suppression time (apache#6260) MINOR: Make info logs for KafkaConsumer a bit more verbose (apache#6279) KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics (apache#6265) KAFKA-7884; Docs for message.format.version should display valid values (apache#6209) MINOR: Save failed test output to build output directory MINOR: add test for StreamsSmokeTestDriver (apache#6231) MINOR: Fix bugs identified by compiler warnings (apache#6258) KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] (apache#5433) MINOR: fix bypasses in ChangeLogging stores (apache#6266) MINOR: Make MockClient#poll() more thread-safe (apache#5942) MINOR: drop dbAccessor reference on close (apache#6254) KAFKA-7811: Avoid unnecessary lock acquire when KafkaConsumer commits offsets (apache#6119) KAFKA-7916: Unify store wrapping code for clarity (apache#6255) MINOR: Add missing Alter Operation to Topic supported operations list in AclCommand KAFKA-7921: log at error level for missing source topic (apache#6262) ...
The change-logging stores should not bypass methods in underlying stores. If some of you have a minute, can you take a quick look at this? I happened to notice during some other refactoring that the change-logging store layer sometimes bypasses the underlying store and instead calls across to a different layer. It seems unexpected that it should do so, and it might actually cause problems. There was one spot where it's impossible to avoid it (in the windowed store), but I added a note justifying why we bypass the underlying store. Thanks, -John * MINOR: fix bypasses in ChangeLogging stores * fix test Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>, Bill Bejeck <bbejeck@gmail.com>
The change-logging stores should not bypass methods in underlying stores.
Committer Checklist (excluded from commit message)