Skip to content

Improve performance of checkpointHighWatermarks, patch 1/2#6741

Merged
ijuma merged 10 commits intoapache:trunkfrom
gardnervickers:checkpoint-highwatermarks-alloc-1
Mar 26, 2020
Merged

Improve performance of checkpointHighWatermarks, patch 1/2#6741
ijuma merged 10 commits intoapache:trunkfrom
gardnervickers:checkpoint-highwatermarks-alloc-1

Conversation

@gardnervickers
Copy link
Copy Markdown
Contributor

@gardnervickers gardnervickers commented May 15, 2019

This PR works to improve high watermark checkpointing performance.

ReplicaManager.checkpointHighWatermarks() was found to be a major contributor to GC pressure, especially on Kafka clusters with high partition counts and low throughput.

Commit gardnervickers/kafka-1@4307a73

Adds a JMH benchmark for checkpointHighWatermarks which establishes a baseline for the commits which come after it.

The parameterized benchmark was run with 100, 1000 and 2000 topics.

Commit 27c1ba1

Modifies ReplicaManager.checkpointHighWatermarks() to avoid extra copies, and caches the Log parent directory to avoid frequent allocations when calculating the File.getParent(). It appears Log.dir is accessed from ReplicaManager.checkpointHighWatermarks without taking the Log.lock currently, this PR does the same for the cached Log.parentDir. I considered making these values change atomically but I had some trouble thinking of a case where not taking the lock would be safe with the current implementation, but unsafe with these proposed changes.

Improvements over gardnervickers/kafka-1@4307a73 (baseline)

Topic Count Ops/ms MB/sec allocated
100 + 51% - 91%
1000 + 143% - 49%
2000 + 149% - 50%

Also:

  • Changed all usages of Log.dir.getParent to Log.parentDir and Log.dir.getParentFile to
    Log.parentDirFile.
  • Only expose accessors for Log.dir and Log.parentDir.
  • Removed unused parameters in Partition.makeLeader, Partition.makeFollower and Partition.createLogIfNotExists.

@gardnervickers gardnervickers force-pushed the checkpoint-highwatermarks-alloc-1 branch 2 times, most recently from 4459fc2 to ce31895 Compare May 15, 2019 23:37
@gardnervickers
Copy link
Copy Markdown
Contributor Author

retest this please

@gardnervickers gardnervickers force-pushed the checkpoint-highwatermarks-alloc-1 branch from 66cf7ad to a7a56cb Compare May 23, 2019 17:18
override def apply(t: String): util.HashMap[TopicPartition, Long] = {
return new util.HashMap[TopicPartition, Long]()
}
})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can now use a SAM here since we dropped support for Scala 2.11.

private def populateHWMMap(mapping: java.util.HashMap[String, util.HashMap[TopicPartition, Long]], optReplica: Option[Replica]): Unit = {
optReplica.foreach(replica => {
if (replica.log.isDefined) {
val dir = replica.log.get.parentDir
Copy link
Copy Markdown
Member

@ijuma ijuma Feb 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rely on File instead of converting to String? That would avoid a bunch of allocation without having to cache the value.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code, it seems like getParentFile does the allocation too. So, maybe caching the parent dir is needed.

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated
private val lastFlushedTime = new AtomicLong(time.milliseconds)

// Cache value of parent directory
@volatile var parentDir: String = dir.getParent
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the mutable field private and expose an accessor? Should this be returning a File instead of String? Are there other places where we call dir.getParent or dir.getParentFile that could use this?

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Feb 3, 2020

A high level comment: we do a lot of unnecessary work indexing by the log dir even though it's often 1. When it's not 1, it's a small number. If we kept a map from log dir to replicas that only gets updated when adding/removing replicas, it would reduce the amount of work by a lot. It's worth checking if this is a better path than all the micro optimizations.

@ijuma ijuma force-pushed the checkpoint-highwatermarks-alloc-1 branch from a7a56cb to 88bba8e Compare March 24, 2020 16:34
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 24, 2020

retest this please

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 24, 2020

@gardnervickers I made the following changes:

  1. Rebased and made it compile given changes in trunk since then
  2. Moved import control changes to import-control-jmh-benchmarks
  3. Removed slf4j logging change
  4. Changed checkpointHighwatermarks to use AnyRefMap and simplified it a bit.
  5. Changed all usages of Log.dir.getParent to Log.parentDir and Log.dir.getParentFile to
    Log.parentDirFile.
  6. Only expose accessors for Log.dir and Log.parentDir.
  7. Removed unused parameters in Partition.makeLeader, Partition.makeFollower and Partition.createLogIfNotExists.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 25, 2020

retest this please

Copy link
Copy Markdown
Contributor

@lbradstreet lbradstreet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recent changes all look good to me, but @gardnervickers should review it too.

Copy link
Copy Markdown
Contributor Author

@gardnervickers gardnervickers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

ijuma pushed a commit that referenced this pull request Apr 25, 2020
For brokers with replica counts > 4000, allocations from logsByDir become
substantial. logsByDir is called often by LogManager.checkpointLogRecoveryOffsets
and LogManager.checkpointLogStartOffsets. The approach used is similar to the
one from the checkpointHighwatermarks change in
#6741.

Are there better ways to structure out data structure to avoid creating logsByDir on
demand for each checkpoint iteration? This micro-optimization will help as is, but if
we can avoid doing this completely it'd be better.

JMH benchmark results:
```
Before:
Benchmark                                                                      (numPartitions)  (numTopics)   Mode  Cnt        Score        Error   Units
CheckpointBench.measureCheckpointLogStartOffsets                                             3          100  thrpt   15        2.233 ±      0.013  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3          100  thrpt   15      477.097 ±     49.731  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3          100  thrpt   15   246083.007 ±     33.052    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3          100  thrpt   15      475.683 ±     55.569  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3          100  thrpt   15   245474.040 ±  14968.328    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3          100  thrpt   15        0.001 ±      0.001  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3          100  thrpt   15        0.341 ±      0.268    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3          100  thrpt   15      129.000               counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3          100  thrpt   15       52.000                   ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         1000  thrpt   15        0.572 ±      0.004  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         1000  thrpt   15     1360.240 ±    150.539  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         1000  thrpt   15  2750221.257 ±    891.024    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         1000  thrpt   15     1362.908 ±    148.799  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         1000  thrpt   15  2756395.092 ±  44671.843    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         1000  thrpt   15        0.017 ±      0.008  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         1000  thrpt   15       33.611 ±     14.401    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         1000  thrpt   15      273.000               counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         1000  thrpt   15      186.000                   ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         2000  thrpt   15        0.266 ±      0.002  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         2000  thrpt   15     1342.557 ±    171.260  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         2000  thrpt   15  5877881.729 ±   3695.086    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         2000  thrpt   15     1343.965 ±    186.069  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         2000  thrpt   15  5877788.561 ± 168540.343    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         2000  thrpt   15        0.081 ±      0.043  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         2000  thrpt   15      351.277 ±    167.006    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         2000  thrpt   15      253.000               counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         2000  thrpt   15      231.000                   ms
JMH benchmarks done

After:
CheckpointBench.measureCheckpointLogStartOffsets                                             3          100  thrpt   15        2.809 ±     0.129  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3          100  thrpt   15      211.248 ±    25.953  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3          100  thrpt   15    86533.838 ±  3763.989    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3          100  thrpt   15      211.512 ±    38.669  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3          100  thrpt   15    86228.552 ±  9590.781    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3          100  thrpt   15       ≈ 10⁻³              MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3          100  thrpt   15        0.140 ±     0.111    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3          100  thrpt   15       57.000              counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3          100  thrpt   15       25.000                  ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         1000  thrpt   15        1.046 ±     0.030  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         1000  thrpt   15      524.597 ±    74.793  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         1000  thrpt   15   582898.889 ± 37552.262    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         1000  thrpt   15      519.675 ±    89.754  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         1000  thrpt   15   576371.150 ± 55972.955    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         1000  thrpt   15        0.009 ±     0.005  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         1000  thrpt   15        9.920 ±     5.375    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         1000  thrpt   15      111.000              counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         1000  thrpt   15       56.000                  ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         2000  thrpt   15        0.617 ±     0.007  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         2000  thrpt   15      573.061 ±    95.931  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         2000  thrpt   15  1092098.004 ± 75140.633    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         2000  thrpt   15      572.448 ±    97.960  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         2000  thrpt   15  1091290.460 ± 85946.164    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         2000  thrpt   15        0.010 ±     0.012  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         2000  thrpt   15       19.990 ±    24.407    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         2000  thrpt   15      109.000              counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         2000  thrpt   15       67.000                  ms
JMH benchmarks done

```

For the 2000 topic, 3 partition case, we see a reduction in normalized allocations from 5877881B/op to 1284190.774B/op, a reduction of 78%.

Some allocation profiles from a mid sized broker follow. I have seen worse, but these
add up to around 3.8% on a broker that saw GC overhead in CPU time of around 30%.
You could argue that this is relatively small, but it seems worthwhile for a low risk change.

![image](https://user-images.githubusercontent.com/252189/79058104-33e91d80-7c1e-11ea-99c9-0cf2e3571e1f.png)
![image](https://user-images.githubusercontent.com/252189/79058105-38add180-7c1e-11ea-8bfd-6e6eafb0c794.png)

Reviewers: Ismael Juma <ismael@juma.me.uk>
efeg pushed a commit to linkedin/kafka that referenced this pull request Jun 1, 2020
…ghWatermarks, patch 1/2 (apache#6741)

TICKET =
LI_DESCRIPTION =

This PR works to improve high watermark checkpointing performance.

`ReplicaManager.checkpointHighWatermarks()` was found to be a major contributor to GC pressure, especially on Kafka clusters with high partition counts and low throughput.

Added a JMH benchmark for `checkpointHighWatermarks` which establishes a
performance baseline. The parameterized benchmark was run with 100, 1000 and
2000 topics.

Modified `ReplicaManager.checkpointHighWatermarks()` to avoid extra copies and cached
the Log parent directory Sting to avoid frequent allocations when calculating
`File.getParent()`.

A few clean-ups:
* Changed all usages of Log.dir.getParent to Log.parentDir and Log.dir.getParentFile to
Log.parentDirFile.
* Only expose public accessor for `Log.dir` (consistent with `Log.parentDir`)
* Removed unused parameters in `Partition.makeLeader`, `Partition.makeFollower` and `Partition.createLogIfNotExists`.

Benchmark results:

| Topic Count | Ops/ms | MB/sec allocated |
|-------------|---------|------------------|
| 100               | + 51%    |  - 91% |
| 1000             | + 143% |  - 49% |
| 2000            | + 149% |   - 50% |

Reviewers: Lucas Bradstreet <lucas@confluent.io>. Ismael Juma <ismael@juma.me.uk>

Co-authored-by: Gardner Vickers <gardner@vickers.me>
Co-authored-by: Ismael Juma <ismael@juma.me.uk>

EXIT_CRITERIA = MANUAL [""]
gitlw pushed a commit to linkedin/kafka that referenced this pull request Jun 12, 2020
…ghWatermarks, patch 1/2 (apache#6741)

TICKET =
LI_DESCRIPTION =

This PR works to improve high watermark checkpointing performance.

`ReplicaManager.checkpointHighWatermarks()` was found to be a major contributor to GC pressure, especially on Kafka clusters with high partition counts and low throughput.

Added a JMH benchmark for `checkpointHighWatermarks` which establishes a
performance baseline. The parameterized benchmark was run with 100, 1000 and
2000 topics.

Modified `ReplicaManager.checkpointHighWatermarks()` to avoid extra copies and cached
the Log parent directory Sting to avoid frequent allocations when calculating
`File.getParent()`.

A few clean-ups:
* Changed all usages of Log.dir.getParent to Log.parentDir and Log.dir.getParentFile to
Log.parentDirFile.
* Only expose public accessor for `Log.dir` (consistent with `Log.parentDir`)
* Removed unused parameters in `Partition.makeLeader`, `Partition.makeFollower` and `Partition.createLogIfNotExists`.

Benchmark results:

| Topic Count | Ops/ms | MB/sec allocated |
|-------------|---------|------------------|
| 100               | + 51%    |  - 91% |
| 1000             | + 143% |  - 49% |
| 2000            | + 149% |   - 50% |

Reviewers: Lucas Bradstreet <lucas@confluent.io>. Ismael Juma <ismael@juma.me.uk>

Co-authored-by: Gardner Vickers <gardner@vickers.me>
Co-authored-by: Ismael Juma <ismael@juma.me.uk>

EXIT_CRITERIA = MANUAL [""]
gitlw pushed a commit to linkedin/kafka that referenced this pull request Jun 13, 2020
…ghWatermarks, patch 1/2 (apache#6741)

TICKET =
LI_DESCRIPTION =

This PR works to improve high watermark checkpointing performance.

`ReplicaManager.checkpointHighWatermarks()` was found to be a major contributor to GC pressure, especially on Kafka clusters with high partition counts and low throughput.

Added a JMH benchmark for `checkpointHighWatermarks` which establishes a
performance baseline. The parameterized benchmark was run with 100, 1000 and
2000 topics.

Modified `ReplicaManager.checkpointHighWatermarks()` to avoid extra copies and cached
the Log parent directory Sting to avoid frequent allocations when calculating
`File.getParent()`.

A few clean-ups:
* Changed all usages of Log.dir.getParent to Log.parentDir and Log.dir.getParentFile to
Log.parentDirFile.
* Only expose public accessor for `Log.dir` (consistent with `Log.parentDir`)
* Removed unused parameters in `Partition.makeLeader`, `Partition.makeFollower` and `Partition.createLogIfNotExists`.

Benchmark results:

| Topic Count | Ops/ms | MB/sec allocated |
|-------------|---------|------------------|
| 100               | + 51%    |  - 91% |
| 1000             | + 143% |  - 49% |
| 2000            | + 149% |   - 50% |

Reviewers: Lucas Bradstreet <lucas@confluent.io>. Ismael Juma <ismael@juma.me.uk>

Co-authored-by: Gardner Vickers <gardner@vickers.me>
Co-authored-by: Ismael Juma <ismael@juma.me.uk>

EXIT_CRITERIA = MANUAL [""]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants