KAFKA-10433 Reuse the ByteBuffer in validating compressed records#9220
KAFKA-10433 Reuse the ByteBuffer in validating compressed records#9220chia7712 wants to merge 1 commit intoapache:trunkfrom
Conversation
|
@chia7712 Thanks for the PR. The intent is good, but I think the approach should be a bit different. As it happens, I have implemented this other approach. Would you be OK if I submit that as a PR and we can compare? |
Please feel free to submit another PR. We all love to see the better solution :) |
|
close as there is a better approach (#9229) |
Use a caching `BufferSupplier` per request handler thread so that decompression buffers are cached if supported by the underlying `CompressionType`. This achieves a similar outcome as #9220, but with less contention. We introduce a `RequestLocal` class to make it easier to introduce new request scoped stateful instances (one example we discussed previously was an `ActionQueue` that could be used to avoid some of the complex group coordinator locking). This is a small win for zstd (no synchronization or soft references) and a more significant win for lz4. In particular, it reduces allocations significantly when the number of partitions is high. The decompression buffer size is typically 64 KB, so a produce request with 1000 partitions results in 64 MB of allocations even if each produce batch is small (likely, when there are so many partitions). I did a quick producer perf local test with 5000 partitions, 1 KB record size, 1 broker, lz4 and ~0.5 for the producer compression rate metric: Before this change: > 20000000 records sent, 346314.349535 records/sec (330.27 MB/sec), 148.33 ms avg latency, 2267.00 ms max latency, 115 ms 50th, 383 ms 95th, 777 ms 99th, 1514 ms 99.9th. After this change: > 20000000 records sent, 431956.113259 records/sec (411.95 MB/sec), 117.79 ms avg latency, 1219.00 ms max latency, 99 ms 50th, 295 ms 95th, 440 ms 99th, 662 ms 99.9th. That's a 25% throughput improvement and p999 latency was reduced to under half (in this test). Default arguments will be removed in a subsequent PR. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
|
@chia7712 : We filed KAFKA-15674 to revisit this. The current implementation (#9229) passes around a thread unsafe data structure to multiple places such as the ReplicaManager, GroupCoordinator, TxnCoordinator, etc. Each of those places has the logic to add callbacks. If one is not careful (as in KAFKA-15674), the thread unsafe data structure could be included in those callbacks, which will lead to subtle concurrency issues. It's probably safer to replace the thread unsafe data structure with the thread safe one as you had in this PR. Would you be interested in resurrecting this PR? If there is no significant performance impact, we could just take the approach in this PR. cc @ijuma |
issue: https://issues.apache.org/jira/browse/KAFKA-10433
It is hot method so reusing the ByteBuffer can reduce a bunch of memory usage if the compression type supports BufferSupplier.
experiment
Committer Checklist (excluded from commit message)