KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"#11796
KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"#11796guozhangwang merged 25 commits intoapache:trunkfrom
Conversation
8b3780e to
b48f7d5
Compare
|
@ableegoldman , @guozhangwang , @mjsax this is the new PR for KIP-770(follow up from #11424 last 3-4 commits hold the new set of changes. I haven't added the StreamConfigUtils class, was thinking if i can do it on a separate ticket. WDYT? Also, the javadoc is pending at this moment. Will add once we are ok with the changes here. |
|
@guozhangwang , this PR is showing a failure in streams:compileTestJava.. It seems to be working on my local. |
|
Hello @vamossagar12 I checked out your branch, and run the I guess your branch was not rebased on top of |
b48f7d5 to
7a41243
Compare
|
@guozhangwang i had done a rebase before pushing and hadn't notice the issue. Not sure what went wrong there(maybe an oversight from me). Anyways i did another rebase and now i could see the error. Have fixed it and pushed again. |
|
@guozhangwang , now i see failures with raft/zk tests. that seems unrelated to this PR. |
guozhangwang
left a comment
There was a problem hiding this comment.
I only reviewed the code snippet around deprecated config logic, only nit comments regarding the log lines.
@ableegoldman could you also take another look?
There was a problem hiding this comment.
overriding {} to {} here could also be a bit confusing since it could be interpreted as the second override the first by some one. I'd suggest we just say Both deprecated config {} and new config {} are set, hence {} is ignored and the new config {} (value {}) is used.
There was a problem hiding this comment.
Yeah makes sense. I have updated it to use topology name along with the rest of log message. I guess that should be fine?
There was a problem hiding this comment.
Ditto here: we just say "Only deprecated config {} is set and hence it's value {} would be used; we suggest setting the new config {} instead since the deprecated config {} would be removed in the future."
|
Re-triggered jenkins. |
|
@vamossagar12 could you resolve the conflicts before I re-trigger jenkins again? |
Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>
…ax buffer bytes size and then resizing
…ache size config overrides
@guozhangwang done. On my local, only one test failed in streams which is => org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest#shouldRestoreState |
|
Thanks @vamossagar12 . I've merged the PR, and please go ahead and mark the ticket / KIP as for 3.3.0. |
|
Thanks @guozhangwang ! |
ableegoldman
left a comment
There was a problem hiding this comment.
Thanks for the new PR @vamossagar12 ! Sorry I wasn't able to take a look earlier but I just gave it a quick pass now. I took the liberty of moving the #getTotalCacheSize method to the StreamsConfigUtils class myself since I'm doing a quick warning log PR in that part of the code anyways.
| createAndAddStreamThread(cacheSizePerThread, i); | ||
| createAndAddStreamThread(0L, 0L, i); | ||
| } | ||
| // Initially, all Stream Threads are created with 0 cache size and max buffer size and then resized here. |
There was a problem hiding this comment.
Why do we do it this way? ie rather than just computing the size upfront and creating the threads with that? I find this a bit confusing, mainly because I can't tell if there is a technical reason for doing it this way or it was just a design choice
| // and then resize them later | ||
| streamThread = createAndAddStreamThread(0L, 0L, threadIdx); | ||
| final int numLiveThreads = getNumLiveStreamThreads(); | ||
| resizeThreadCacheAndBufferMemory(numLiveThreads + 1); |
There was a problem hiding this comment.
I think this feedback got lost in the shuffle when we reverted the original PR, but this needs to be fixed -- the + 1 was only necessary in the old code because we resized the cache before adding the new thread/computing the thread count. Now that we first create the new thread, the numLiveThreads count should accurately reflect the number of current threads, so we shouldn't be adding to it anymore.
I'll include a fix for this on the side in a PR I'm doing so no worries 🙂
There was a problem hiding this comment.
I read the code change, and I agree with you. We don't need +1 anymore. Nice catch!
There was a problem hiding this comment.
Oh yeah I forgot about it :( Thanks for the new PR. I will take a look..
|
FYI @vamossagar12 here is the PR: #11959 |
Thanks for that! I thought I will create a follow up PR for moving to StreamsConfigUtils after another issue that I have been working on. You have done it already ! |
…11959) Since the topology-level cache size config only controls whether we disable the caching layer entirely for that topology, setting it to anything other than 0 has no effect. The actual cache memory is still just split evenly between the threads, and shared by all topologies. It's possible we'll want to change this in the future, but for now we should make sure to log a warning so that users who do try to set this override to some nonzero value are made aware that it doesn't work like this. Also includes some minor refactoring plus a fix for an off-by-one error in #11796 Reviewers: Luke Chen <showuon@gmail.com>, Walker Carlson <wcarlson@confluent.io>, Sagar Rao <sagarmeansocean@gmail.com>
* KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes" (apache#11796) Implements KIP-770 Reviewers: Guozhang Wang <wangguoz@gmail.com> (cherry picked from commit 0924fd3) * KAFKA-13945: add bytes/records consumed and produced metrics (apache#12235) Implementation of KIP-846: Source/sink node metrics for Consumed/Produced throughput in Streams Adds the following INFO topic-level metrics for the total bytes/records consumed and produced: bytes-consumed-total records-consumed-total bytes-produced-total records-produced-total Reviewers: Kvicii <Karonazaba@gmail.com>, Guozhang Wang <guozhang@apache.org>, Bruno Cadonna <cadonna@apache.org> * Update CODEOWNERS (#721) (cherry picked from commit ad18b43) Co-authored-by: vamossagar12 <sagarmeansocean@gmail.com> Co-authored-by: A. Sophie Blee-Goldman <sophie@confluent.io> Co-authored-by: Chris Johnson <53450846+cjohnson-confluent@users.noreply.github.com>
…nput.buffer.max.bytes" (apache#11796)" This reverts commit 0924fd3.
Implements KIP-770.