MINOR: log warning when topology override for cache size is non-zero#11959
Conversation
|
Hey @vamossagar12 can you take a look at this? Also cc @guozhangwang @wcarlson5 |
|
@ableegoldman , I did a quick pass.. Looks good to me.. |
showuon
left a comment
There was a problem hiding this comment.
Overall LGTM. Left some minor comments. Thank you.
| resizeThreadCacheAndBufferMemory(numLiveThreads + 1); | ||
| resizeThreadCacheAndBufferMemory(numLiveThreads); | ||
| log.info("Adding StreamThread-{}, there are now {} threads with cache size/max buffer size values as {} per thread.", | ||
| threadIdx, numLiveThreads + 1, getThreadCacheAndBufferMemoryString()); | ||
| threadIdx, numLiveThreads, getThreadCacheAndBufferMemoryString()); |
There was a problem hiding this comment.
I think we should add tests for it, to make sure after adding a thread, the cache size and buffer memory is set as what we expected.
There was a problem hiding this comment.
@showuon , there are some tests for this here like : https://github.com/vamossagar12/kafka/blob/84c75ce44167e5e1bf8748a7ff9ba95309cd3d7e/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java#L373. which were added as part of original PR. Do you think they are enough?
There was a problem hiding this comment.
That test is to test thread removal, right? We should have the similar tests for thread addition. Otherwise, we should be able to catch this issue via these tests.
There was a problem hiding this comment.
ok.. yeah there are 2 more tests for threadReplacement which seem to be printing out an extra thread count. But I agree, a test for thread addition would be good..
| CACHE_MAX_BYTES_BUFFERING_CONFIG); | ||
| } else if (isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides)) { | ||
| cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG); | ||
| final boolean stateStoreCacheMaxBytesOverridden = isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides); |
There was a problem hiding this comment.
@ableegoldman , another thing is my initial logic was similar to what you have below (without the boolean variables stateStoreCacheMaxBytesOverridden and cacheMaxBytesBufferingOverridden but I had to change it to the above one as I was getting =>
[2022-03-29T06:53:02.757Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11959/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:127:5: NPath Complexity is 960 (max allowed is 500). [NPathComplexity]
Looks like this PR suffers from the same problem.
There was a problem hiding this comment.
Ah :/
I think this is a case where it's reasonable to suppress this checkstyle exception, as the NPath complexity is sort of "artificially" complex, ie the code is easy to follow it just has many branches because it goes through each config
There was a problem hiding this comment.
Ok.. yeah we can suppress it but after reading the error, I also felt it has too many branches. Anyways, it's fine I guess.
|
@showuon addressed your comment and added test coverage for the cache & buffer size after adding a thread, plz give a +1 if all looks good |
wcarlson5
left a comment
There was a problem hiding this comment.
I don't have anything else to add tbh
showuon
left a comment
There was a problem hiding this comment.
Thanks for the update and the tests. LGTM!
|
|
||
| for (final String log : appender.getMessages()) { | ||
| // after we replace the thread there should be two remaining threads with 5 bytes each | ||
| if (log.endsWith("Adding StreamThread-3, there are now 3 threads with cache size/max buffer size values as 3/178956970 per thread.")) { |
There was a problem hiding this comment.
Apparently there was already a test for the cache being sized correctly after a thread replacement/addition, I'm guessing this was updated with the incorrect values in the PR that introduced the off by one bug -- probably should have been a red flag if the value of the cache size changed here, not to mention the comment above which explicitly mentions it should be 5 bytes per thread 🙂
There was a problem hiding this comment.
yeah the comment is off.
There was a problem hiding this comment.
No actually I meant that the comment was correct -- the test was just verifying incorrect results (after the thread replacement there should be 2 threads with 5MB of cache, as it says). But no worries
| } | ||
|
|
||
| @Test | ||
| public void shouldResizeMaxBufferAfterThreadReplacement() throws InterruptedException { |
There was a problem hiding this comment.
This test seems to be identical to the one above except that it uses the default cache size and a custom input buffer size -- however both tests are still validating both sizes in the log message so this doesn't need to be a separate test, we can just cover both in the original test.
(The total time to run streams tests has been getting a bit out of hand lately, so we should have a concrete reason to split something out into a second test like this, especially the more "heavy" integration tests)
|
All test failures are in Connect, so unrelated. Going to merge |
|
Merged to trunk |
|
Thanks for fixing the tests, @ableegoldman ! |
…on-zero (apache#11959)" This reverts commit 1317f3f.
Since the topology-level cache size config only controls whether we disable the caching layer entirely for that topology, setting it to anything other than
0has no effect. The actual cache memory is still just split evenly between the threads, and shared by all topologies.It's possible we'll want to change this in the future, but for now we should make sure to log a warning so that users who do try to set this override to some nonzero value are made aware that it doesn't work like this.
Also includes some minor refactoring plus an off-by-one fix from #11796