KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"#11424
KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"#11424guozhangwang merged 18 commits intoapache:trunkfrom
Conversation
|
@guozhangwang Implementation of the KIP. Note that number of files is big as this renames the cache size config. |
|
@ableegoldman , could you also review this whenever you get the chance. |
ableegoldman
left a comment
There was a problem hiding this comment.
Thanks for the PR! Lmk if you have any questions about any of my comments/questions -- I think the main thing is just that we, unfortunately, can't get rid of any of the code handling the old configs just yet, since not everyone will upgrade their code off of deprecated APIs right away (though we wish they would!)
There was a problem hiding this comment.
Seems like this is exactly the same logic as getCacheSizePerThread, can we instead just change the name of the existing method to match both cases rather than writing duplicate code? Maybe something like getMemorySharePerThread or getMemorySizePerThread?
There was a problem hiding this comment.
Same here, if it's going to be exactly the same then we only need one method.
Although, I don't think the GlobalThread actually even has an input buffer the way StreamThreads do (doesn't seem to make sense for it to need one because it can just process all of its polled records right away, whereas the StreamThread may need to buffer them for various reasons)
You could still probably combine into a single method and just include a flag for whether or not to call resize on the global thread (with it being true for the cache case, and false for the input buffer resizing)
There was a problem hiding this comment.
+1 here as well. I think we would always resize both buffer and state cache at the same time moving forward.
There was a problem hiding this comment.
I think you still need (or at least want) to keep these deprecated configs defined
There was a problem hiding this comment.
nit: call this one resizeCache to differentiate between this and the buffer size? Also, is there a method somewhere to update the input buffer size? Seems like we're always resizing the cache when a thread is added/removed
There was a problem hiding this comment.
Haha yeah that's a good catch. Missed that part. This is the method which is supposed to do the same
There was a problem hiding this comment.
Do we need this? Seems like we can get rid of the extra maxBufferSizeBytes and instead just directly read out the value of maxBufferResizeSize (in which case we should probably rename maxBufferResizeSize to maxBufferSizeBytes -- my point is, I think we can get away with just storing the current size of the buffer as a long and the max size as an AtomicLong that can be updated from outside the thread)
There was a problem hiding this comment.
Thanks for the PR! Lmk if you have any questions about any of my comments/questions -- I think the main thing is just that we, unfortunately, can't get rid of any of the code handling the old configs just yet, since not everyone will upgrade their code off of deprecated APIs right away (though we wish they would!)
Thanks for the review. I have added some comments/questions. Other suggestions, make total sense and would incorporate them.
There was a problem hiding this comment.
This logic is a little difficult to read 😅 -- would it help to just move the bufferSize -= processedData.totalBytesConsumed; to before this check?
Although on that note, it might be cleaner to just move this check regarding whether to resume consuming to right before we call poll, that way there's a nice symmetry between the pause and resume checks, and all the logic is consolidated to one place
There was a problem hiding this comment.
Haha :D
The reason I have this check => bufferSize > maxBufferSizeBytes is that the resume should happen only if after the current round of consumption, the buffersize which had breached the threshold, now went below. Without that check, it will always enter the if block- even when it's already lesser(and we subtract something more and reduce it further). Did that make sense? :D
The idea of placing it here is that, right after removing some records from the buffer, did the buffer size come down. It similar to how StreamThread resume used to work(the one I removed). This logic can very well go before poll but I thought adding it here was more non invasive as there's already some metrics related stuff and other things happening in this block.
There was a problem hiding this comment.
I also feel this logic is a bit awkward, starting from the fact that we need to report how many bytes we've consumed from the process :) I think we can simply do the following:
At the end of polling phase, and at the end the process loop (a.k.a. here), we loop over all the active tasks, and get their "input buffer size", which would delegate to each task's corresponding PartitionGroup and then RecordQueue. And then based on that we can decide whether to resume / pause accordingly. Then
- we do not need to maintain a local
bufferSizeat the task here, i.e. we always re-compute from the task's record queue, which is the source of truth. - we do not need to maintain and propagate up the
consumed byteswithin each iteration here.
There was a problem hiding this comment.
Why do we only pause the non-empty partitions? If the buffer is full, we have to pause all of them, no?
There was a problem hiding this comment.
Yeah.. this was something that was discussed/decided in the JIRA conversation. You can find the explanation here: https://issues.apache.org/jira/browse/KAFKA-13152?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647
There was a problem hiding this comment.
Ah now I remember, thanks 😄
There was a problem hiding this comment.
Can you leave a comment above this method explaining this? Don't want to forget why we did this a year from now and then accidentally break things
There was a problem hiding this comment.
Unfortunately we can't remove this logic yet, since some users may still be setting only the buffered.records.per.partition config, and they shouldn't see a sudden explosion of memory just because they didn't switch over to the new config right away
There was a problem hiding this comment.
I see... but if both the configs are in play, won't that also lead to pause/resume of partitions based upon whichever threshold is breached? Maybe we can add some check that we do this only if maxBufferedSize is set to some value? We might also want to consider if it has a default value and use the condition accordingly,
There was a problem hiding this comment.
Maybe we can add some check that we do this only if maxBufferedSize is set to some value?
Yeah, sorry, I should have been more clear here -- we only need to continue doing this if the user is still setting the buffered.records.per.partition config. I mentioned this in another comment, but just in case you weren't aware, you can call originals() on a StreamsConfig object to get a map of the actual configs passed in by the user -- that way you know what they actually set vs what's just a default.
Then you can just set maxBufferedSize to null or define a static constant NOT_SET = -1 and then only continue doing this partition-level pause/resume if the user is still using buffered.records.per.partition. Does that make sense?
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks for the PR! I made a first pass on it.
There was a problem hiding this comment.
nit: Could we merge these two info lines into a single one? It seems a bit redundant to log twice here. Also this new log line seems wrong since it has four parameters but only three values provided.
E.g.
Adding StreamThread-{}, the current total number of thread is {}, each thread now has a buffer size {} and cache size {}
And
Terminating StreamThread-{}, the current total number of thread is {}, each thread now has a buffer size {} and cache size {}
There was a problem hiding this comment.
nit: ditto here. See above for the consolidated log line. Here we can emphasize it is "Terminating newly added threads".
There was a problem hiding this comment.
+1 here as well. I think we would always resize both buffer and state cache at the same time moving forward.
There was a problem hiding this comment.
Not sure I understand this logic here: it seems we only call setBytesConsumed once with 0 here and there's no other callers?
There was a problem hiding this comment.
Also, even with the correct logic, I'm wondering if we can just define it as a local variable within the process here instead of augmenting the Task interface?
There was a problem hiding this comment.
This does not look right to me: why we use the size value read from cacheResizeSize to assign to maxBufferSizeBytes? They should be totally orthogonal.
There was a problem hiding this comment.
I think we can simply the logic and do not need to keep track of "consumed bytes" within a task here, see my other comment.
There was a problem hiding this comment.
I also feel this logic is a bit awkward, starting from the fact that we need to report how many bytes we've consumed from the process :) I think we can simply do the following:
At the end of polling phase, and at the end the process loop (a.k.a. here), we loop over all the active tasks, and get their "input buffer size", which would delegate to each task's corresponding PartitionGroup and then RecordQueue. And then based on that we can decide whether to resume / pause accordingly. Then
- we do not need to maintain a local
bufferSizeat the task here, i.e. we always re-compute from the task's record queue, which is the source of truth. - we do not need to maintain and propagate up the
consumed byteswithin each iteration here.
There was a problem hiding this comment.
This function seems not used.
BTW if we do not maintain the local bufferSize then we would not need it anyways :)
There was a problem hiding this comment.
See my other comment: I think we can avoid propagating both processed-records and processed-bytes from the process call.
6f6316d to
656c01b
Compare
|
@vamossagar12 the jenkins failure are due to compilation warnings: This seems not relevant to the PR, I will re-trigger it. |
656c01b to
827ea22
Compare
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks @vamossagar12 , I've made a second pass and left some more comments.
Regarding the metric name, I will ping @ableegoldman and @cadonna again and get back to you.
There was a problem hiding this comment.
nit: could we add a TODO here that this logic should be removed once we remove the deprecated old config as well?
There was a problem hiding this comment.
nit: we do not need the ?focusedCommentId=17400647&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17400647 suffix in the javadoc :)
There was a problem hiding this comment.
Oh that's because the ticket has a lot of comments and I intended to point to the comment which talks about the design decision made here. If it doesn't make sense, will remove it :)
There was a problem hiding this comment.
This function seems not addressed? Or was I missing anything?
There was a problem hiding this comment.
These functions seem not used any more?
There was a problem hiding this comment.
This comment seems not addressed.
|
This is being used now. StreamThread delegates to this function to get the totalBytesBuffered. |
Thanks @guozhangwang . I had one question on the changes suggested by you. |
Just replied on your question. LMK if that works or not. |
827ea22 to
723a4ce
Compare
Yeah it does.. Made the changes. Plz let me know when we have the decision on the metric name. Also, plz review if any other changes are needed. |
|
Thanks @vamossagar12 , I do not have further comments now, re-triggering the jenkins tests. |
|
@vamossagar12 there are some compilation errors in jenkins, could you rebase from latest trunk and check what's the issue? Please ping me once that's resolved. |
|
@guozhangwang , it's due to the usage of deprecated configs and gradle having a check against it. In my local I commented out that check for compile/test/checkstyle etc. I think we don't want to get rid of these configs immediately, so these usages are needed for now. What would you suggest here? |
|
@vamossagar12 Note that |
|
hey @guozhangwang , For |
@guozhangwang , i tried to address this again, but what I see is that since the 2 deprecated configs are topology level configs, they are being set/checked in TopologyConfig. As I said, for backward compatibility reasons, we are checking if it has been set by the user or not. Looks like the values being set here are further being used for getting/setting task level configs.. |
|
hey @guozhangwang , did you get a chance to look at my above comment? |
Ah yes, for StreamsConfigs in order for backward compatibility we still need to reference some deprecated configs. So:
Does that sound good to you? If yes we can go ahead and make it done. |
723a4ce to
4244f5a
Compare
Thanks @guozhangwang . I added that annotation wherever applicable. Now the only thing pending is the renaming of the total-bytes metric :) |
|
@vamossagar12 seems the jenkins fails still but due to |
|
@cadonna suggested renaming the metric to |
|
@guozhangwang , i fixed that. Hopefully this time we won't see errors related to this PR. I will also check it. Also, @ableegoldman , thanks for confirming the metric name. I have updated the PR and also sent out an update on the KIP. |
|
I'm fine with BTW I thought we have a similar metric for store cache, but after checking I realize we actually do not have one yet :P Maybe we can add later. |
|
There are still some failures in Jenkins, I'm gonna retrigger them. If they still fails @vamossagar12 please take a look and see if they are relevant. |
…ache size config overrides
8dd3457 to
a69d329
Compare
@guozhangwang , sorry it was an oversight on my part. Looking at the names, it wasn't evident that these were related to this PR but apparently they were :D I ran the tests twice locally and there are 3-4 which fail intermittently due to timeout errors. When I ran them afterwards, even they seemed to have passed. Let's see what happens in this run. |
Cool. Thanks @vamossagar12 |
Looks like the tests passed this time. ARM BUILD has some issue with in jdk8 and in jdk 17. Do you think it's better now? |
|
Also, I created a new JIRA https://issues.apache.org/jira/browse/KAFKA-13624 for adding a new metric for cache size. I am assuming it needs a KIP and hence added needs-kip tag to it. |
|
I checked the jenkins failures and they are irrelevant indeed this time. Merging to trunk now. Thank you so much @vamossagar12 for the great contribution! Please feel free to update the JIRA ticket and the KIP wiki as well. |
Thanks @guozhangwang and @ableegoldman for the support on this one! Also, this ticket => https://issues.apache.org/jira/browse/KAFKA-13624, do you think it makes sense? Do we need a KIP for this? |
| } | ||
| } else { | ||
| cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); | ||
| cacheSize = globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); |
There was a problem hiding this comment.
Hey guys, sorry I didn't get around to doing another full review of this earlier. This is a bug, we should always check whether the old deprecated config has been set and use that value if it was set and the new config was not.
I noticed something similar in the TopologyTestDriver, although I'm not sure how much of an effect that would have since AFAICT there's not really "caching" in the TTD -- at least, there's not supposed to be according to the javadocs
There was a problem hiding this comment.
@ableegoldman , I have added that check. The line above is in the else block when neither of the 2 configs are set. Here's the complete block of code =>
if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) ||
isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides) && isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
log.info("Topology {} is using both {} and deprecated config {}. overriding {} to {}",
topologyName,
STATESTORE_CACHE_MAX_BYTES_CONFIG,
CACHE_MAX_BYTES_BUFFERING_CONFIG,
STATESTORE_CACHE_MAX_BYTES_CONFIG,
cacheSize);
} else if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
log.info("Topology {} is using deprecated config {}. overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
} else {
cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
log.info("Topology {} is overriding {} to {}", topologyName, STATESTORE_CACHE_MAX_BYTES_CONFIG, cacheSize);
}
} else {
cacheSize = globalAppConfigs.getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
}
Am I missing something here?
There was a problem hiding this comment.
Ah, I see the confusion. The #isTopologyOverride method checks whether the config has been overridden for the specific topology, ie has been set in the Properties passed in to StreamsBuilder#build -- it's not looking at what we call the globalAppConfigs which are the actual application configs: ie those passed in to the KafkaStreams constructor.
So basically there are two sets of configs. The value should be taken as the first of these to be set by the user, in the following order:
statestore.cache.max.bytesintopologyOverridescache.max.bytes.bufferingintopologyOverrides
3)statestore.cache.max.bytesinglobalAppConfigscache.max.bytes.bufferinginglobalAppConfigs
Essentially, using #getTotalCacheSize on the topologyOverrides if either of them is set (which this PR is doing) and on the globalAppConfigs if they are not (which is the regression here).
On that note -- we also need to move ##getTotalCacheSize out of StreamsConfig, because it's a public class and wasn't listed as a public API in the KIP (nor should it be, imo). I recommend creating a new static utility class for things like this, eg StreamsConfigUtils in the org.apache.kafka.streams.internals package. There are some other methods that would belong there, for example the StreamThread methods #processingMode and #eosEnabled should be moved as well
Hope that all makes sense -- and lmk if you don't think you'll have the time to put out a full patch, and I or another Streams dev can help out 🙂
There was a problem hiding this comment.
thanks @ableegoldman! i am running slightly occupied currently. But, I will make the changes in the next few days. As you said, i will introduce a new utility class and move these methods out.
There was a problem hiding this comment.
Sounds good! There's no rush, but I'll make sure we have your new PRs reviewed and merged quickly whenever they are ready, since you've worked so hard on this already. I'm sorry I wasn't able to make another pass on your original PR, but hopefully this won't be too much of a bother.
There was a problem hiding this comment.
Sorry just catching up on this comment @ableegoldman . That's perfectly fine!
| final ThreadCache cache = new ThreadCache( | ||
| logContext, | ||
| Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)), | ||
| Math.max(0, streamsConfig.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG)), |
There was a problem hiding this comment.
Ditto to this comment, we can't just read off the new statestore.cache.max.bytes without checking whether the old cache.max.bytes.buffering was set
There was a problem hiding this comment.
Oh here, I thought since it's a test case so it shouldn't really matter. Isn't that the case?
There was a problem hiding this comment.
since it's a test case so it shouldn't really matter. Isn't that the case?
Well if someone is using the TTD to write unit tests and those tests start to fail after they upgrade because the caching is different, I would say that's compatibility change.
Although I read the TTD's javadocs earlier and remembered that it actually processes records synchronously, which effectively means that the only thing that matters/affects the TTD results is whether the cache size is non-zero or has been set to 0 -- and setting it to 0 only matters if it's correctly set to 0 in the TopologyConfig, not the value here. Which is a long way of saying that in hindsight, this config/bug doesn't really impact anything after all. In fact, imho we should probably just hard-code the TTD's ThreadCache cache size to 0 -- but let's not wrap that change into an already rather large PR in case there's something I'm not taking into account here.
So tl;dr, for future reference do still need to maintain backwards compatibility in the TTD since it's part of the public interface. But it just so happens that this particular bug doesn't actually break anything "real" or have any visible impact (at least AFAICT
There was a problem hiding this comment.
got it.. If i understood correctly, we need a new PR with all the changes in this PR and the new ones along with document changes, right?
There was a problem hiding this comment.
got it.. If i understood correctly, we need a new PR with all the changes in this PR and the new ones along with document changes, right?
That sounds right to me.
|
Seems this PR breaks backward compatibility. Will need to revert it for now. Sorry. @vamossagar12 -- Can you do a new PR containing the commit plus the required fixes? Happy to help reviewing if necessary. |
|
Btw: this PR does not update the docs -- we should docs updates, too. (Also ok to do in a follow up PR.) To ensure backward compatibility, it might also be good to split the actually PR into two: first so the internal config change without rewriting any tests -- this should ensure that no existing test breaks. -- In a follow up PR we update the test to use the new config. And we should add a test that expliclity tests the old config to test backward compatibility explicitly. |
|
@ableegoldman @mjsax My read on the code is that we only need to change the TopologyTestDriver, while the first place seems fine to me. Did I miss anything? |
|
I did not dig into the details myself. Anyway, might be better to discuss on the new PR? |
If by "first place" you mean the bug in the TopologyConfig class, then no, other way around actually. The TopologyConfig bug is the one that actually breaks compatibility, the TTD one actually doesn't really do anything -- if I understand the TTD correctly. See this |
Crystal! Thanks for the clarification @ableegoldman |
| // and then resize them later | ||
| streamThread = createAndAddStreamThread(0L, 0L, threadIdx); | ||
| final int numLiveThreads = getNumLiveStreamThreads(); | ||
| resizeThreadCacheAndBufferMemory(numLiveThreads + 1); |
There was a problem hiding this comment.
One more thing -- @wcarlson5 mentioned there was an off-by-one error here due to the re-ordering of these calls. Specifically, the + 1 in this line was necessary before now because we called resizeThreadCache before actually adding the new thread, so we had to account for the new thread by adding one. But since we now create/add the new thread first, the getNumLiveStreamThreads method will actually return the correct number of threads, so we don't need the + 1 anymore.
On that note, I take it we reordered these calls because we now create the thread without the cache value and then call resize to set the cache after the thread has already been created. I was wondering: why do we need to do this post-construction resizing? I only looked at this part of the PR briefly, but it seems to me like we always have the actual cache size known when we're creating the thread, so can't we just pass that in to the StreamThread#create method/constructor? It's just a bit confusing to initialize the cache size to some random value, it took me a little while to figure out what was going on with that
There was a problem hiding this comment.
I was wondering: why do we need to do this post-construction resizing?
The main motivation is to consolidate the resizing of the thread cache and buffer within a single call. More details can be found in this comment thread: #11424 (comment) I suggested we initialize the new thread with 0 value -- should not be a random value? -- and then resize (at that time we have the correct number of threads to divide).
|
@vamossagar12 -- Any update from your side about opening a new PR? |
|
@mjsax not yet.. I couldn't find the time yet to pick this up. Would send an update soon. |
PR implementing KIP-770 (#11424) was reverted as it brought in a regression wrt pausing/resuming the consumer. That KIP also introduced a change to deprecate config CACHE_MAX_BYTES_BUFFERING_CONFIG and replace it with STATESTORE_CACHE_MAX_BYTES_CONFIG. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
PR implementing KIP-770 (apache#11424) was reverted as it brought in a regression wrt pausing/resuming the consumer. That KIP also introduced a change to deprecate config CACHE_MAX_BYTES_BUFFERING_CONFIG and replace it with STATESTORE_CACHE_MAX_BYTES_CONFIG. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
|
Thanks for keep looking into this issue, and continue to try getting it merged. After chatting with you offline I thought a bit more around the logic of pause / resume. And I think that besides the additional logic of:
We also do the following:
|
PR implementing KIP-770 (apache#11424) was reverted as it brought in a regression wrt pausing/resuming the consumer. That KIP also introduced a change to deprecate config CACHE_MAX_BYTES_BUFFERING_CONFIG and replace it with STATESTORE_CACHE_MAX_BYTES_CONFIG. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
PR implementing KIP-770 (apache#11424) was reverted as it brought in a regression wrt pausing/resuming the consumer. That KIP also introduced a change to deprecate config CACHE_MAX_BYTES_BUFFERING_CONFIG and replace it with STATESTORE_CACHE_MAX_BYTES_CONFIG. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org> Co-authored-by: vamossagar12 <sagarmeansocean@gmail.com>
This PR is an implementation of: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390. The following changes have been made: