MINOR: reduce allocations in log start and recovery checkpoints#8467
MINOR: reduce allocations in log start and recovery checkpoints#8467ijuma merged 7 commits intoapache:trunkfrom
Conversation
| def logsByDir: Map[String, Map[TopicPartition, Log]] = { | ||
| val byDir = new mutable.HashMap[String, mutable.HashMap[TopicPartition, Log]]() | ||
| def addToDir(tp: TopicPartition, log: Log): Unit = { | ||
| byDir.getOrElseUpdate(log.parentDir, new mutable.HashMap[TopicPartition, Log]()).put(tp, log) |
There was a problem hiding this comment.
In the other PR we used AnyRefMap. Any reason why we are not doing that here?
There was a problem hiding this comment.
We should also add a comment about why we are writing non idiomatic code here to avoid reverts.
There was a problem hiding this comment.
Good point. I will try it and retest and add a comment to prevent regression.
There was a problem hiding this comment.
I have addressed this and it's ready for another review pass.
|
ok to test |
|
Updated benchmarks with AnyRefMap change: Looks marginally better. I'm re-running with |
|
ok to test |
|
retest this please |
| byDir.getOrElseUpdate(log.parentDir, new mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log) | ||
| } | ||
| currentLogs.foreach { case (tp, log) => addToDir(tp, log) } | ||
| futureLogs.foreach { case (tp, log) => addToDir(tp, log) } |
There was a problem hiding this comment.
scala.collection.Map has a def foreachEntry[U](f: (K, V) => U): Unit method that avoids allocating tuples (it's a function with two parameters). If we want to avoid allocations, we should introduce a similar method to Pool and use it here.
There was a problem hiding this comment.
I tried out your suggestion and interestingly saw more allocations. Note extreme use of the var item there in case there was an extra allocation there.
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1011,8 +1011,8 @@ class LogManager(logDirs: Seq[File],
def addToDir(tp: TopicPartition, log: Log): Unit = {
byDir.getOrElseUpdate(log.parentDir, new mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log)
}
- currentLogs.foreach { case (tp, log) => addToDir(tp, log) }
- futureLogs.foreach { case (tp, log) => addToDir(tp, log) }
+ currentLogs.foreachEntry(addToDir)
+ futureLogs.foreachEntry(addToDir)
byDir
}
diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
index 964de7eae2..450f90611b 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -74,6 +74,15 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
def values: Iterable[V] = pool.values.asScala
def clear(): Unit = { pool.clear() }
+
+ def foreachEntry(f: (K, V) => Unit): Unit = {
+ val iter = iterator
+ var item: (K, V) = null
+ while(iter.hasNext) {
+ item = iter.next()
+ f(item._1, item._2)
+ }
+ }
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm 3 2000 thrpt 15 1428889.965 ± 75131.113 B/op
vs this PR:
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm 3 2000 thrpt 15 1284326.850 ± 75148.430 B/op
There was a problem hiding this comment.
^ updated text there to say "this PR" rather than trunk.
There was a problem hiding this comment.
I think the issue is that you're not using the underlying map iterator, right?
There was a problem hiding this comment.
You have to use the iterator of ConcurrentHashMap[K, V] which gives you Map.Entry instead of a tuple. Then you avoid it.
There was a problem hiding this comment.
Do you know what I'm doing wrong when I supply a lambda to the java .forEach? I know it works for consumers but in my brief effort improving this I couldn't make it work with the BiConsumer here and had to create a new one.
I will update the body with the rerun jmh results.
There was a problem hiding this comment.
AFAICT, it easy enough to do something like this:
pool.forEach((k,v) => f(k,v))
But then we appear to be back to allocating tuples.
It doesn't appear easy to do something like:
pool.forEach(kv => f(kv.getKey, kv.getValue))
since it's not able to create a BiConsumer for you.
There was a problem hiding this comment.
Why do you say that the following would allocate tuples?
def foreachEntry(f: (K, V) => Unit): Unit = {
pool.forEach((k, v) => f(k, v))
}Did you measure it? Looking at the code, I don't see any tuples.
There was a problem hiding this comment.
@ijuma I had mistakenly thought it was taking a tuple there, and maybe that was just the way Scala converted a lambda to a BiConsumer, and the results had regressed back to close to what I was seeing with the iterator version.
When I ran it, it returned saw:
1140074.937 ± 75151.914 B/op
def foreachEntry(f: (K, V) => Unit): Unit = {
pool.forEach((k, v) => f(k, v))
}
1044120.722 ± 1676.137 B/op (included in an above comment)
pool.forEach(new BiConsumer[K,V] {
override def accept(t: K, u: V): Unit = f(t, u)
})
Strangely I have just re-run the BiConsumer version and it returned 1188097.413 ± 1639.537 B/op. I'm not sure why it's regressed from what I saw on a previous run. I am OK with using the version with a lambda if you are. I'm not sure I will have time to investigate it further, and it's still a good improvement compared to the version that used foreach.
|
I have updated the benchmark results in the OP with the new |
|
ok to test |
|
ok to test |
|
retest this please |
|
ok to test |
…t-for-generated-requests * apache-github/trunk: (366 commits) MINOR: Improve producer test BufferPoolTest#testCloseNotifyWaiters. (apache#7982) MINOR: document how to escape json parameters to ducktape tests (apache#8546) KAFKA-9885; Evict last members of a group when the maximum allowed is reached (apache#8525) KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (apache#8524) KAFKA-9839; Broker should accept control requests with newer broker epoch (apache#8509) KAKFA-9612: Add an option to kafka-configs.sh to add configs from a prop file (KIP-574) MINOR: Partition is under reassignment when adding and removing (apache#8364) MINOR: reduce allocations in log start and recovery checkpoints (apache#8467) MINOR: Remove unused foreign-key join class (apache#8547) HOTFIX: Fix broker bounce system tests (apache#8532) KAFKA-9704: Fix the issue z/OS won't let us resize file when mmap. (apache#8224) KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol (apache#8326) MINOR: equals() should compare all fields for generated classes (apache#8539) KAFKA-9844; Fix race condition which allows more than maximum number of members(apache#8454) KAFKA-9823: Remember the sent generation for the coordinator request (apache#8445) KAFKA-9883: Add better error message when REST API forwards a request and leader is not known (apache#8536) KAFKA-9907: Switch default build to Scala 2.13 (apache#8537) MINOR: Some html fixes in Streams DSL documentation (apache#8503) MINOR: Enable fatal warnings with scala 2.13 (apache#8429) KAFKA-9852: Change the max duration that calls to the buffer pool can block from 2000ms to 10ms to reduce overall test runtime (apache#8464) ...
For brokers with replica counts > 4000, allocations from logsByDir become substantial. logsByDir is called often by LogManager.checkpointLogRecoveryOffsets and LogManager.checkpointLogStartOffsets. The approach used is similar to the one from the checkpointHighwatermarks change in #6741.
Are there better ways to structure out data structure to avoid creating logsByDir on demand for each checkpoint iteration? This micro-optimization will help as is, but if we can avoid doing this completely it'd be better.
JMH benchmark results:
For the 2000 topic, 3 partition case, we see a reduction in normalized allocations from 5877881B/op to 1284190.774B/op, a reduction of 78%.
Some allocation profiles from a mid sized broker follow. I have seen worse, but these add up to around 3.8% on a broker that saw GC overhead in CPU time of around 30%. You could argue that this is relatively small, but it seems worthwhile for a low risk change.

