MINOR: Reduce allocations in requests via buffer caching#9229
MINOR: Reduce allocations in requests via buffer caching#9229ijuma merged 15 commits intoapache:trunkfrom
Conversation
Use a caching BufferSupplier per request handler thread so that decompression buffers are cached if supported by the underlying CompressionType. This reduces allocations significantly for LZ4 when the number of partitions is high. The decompression buffer size is typically 64 KB, so a produce request with 1000 partitions results in 64 MB of allocations even if each produce batch is small (likely, when there are so many partitions). I did a quick producer perf local test with 5000 partitions, 1 KB record size, 1 broker, lz4 and ~0.5 for the producer compression rate metric: Before this change: > 20000000 records sent, 346314.349535 records/sec (330.27 MB/sec), 148.33 ms avg latency, 2267.00 ms max latency, 115 ms 50th, 383 ms 95th, 777 ms 99th, 1514 ms 99.9th. After this change: > 20000000 records sent, 431956.113259 records/sec (411.95 MB/sec), 117.79 ms avg latency, 1219.00 ms max latency, 99 ms 50th, 295 ms 95th, 440 ms 99th, 662 ms 99.9th. That's a 25% throughput improvement and p999 latency was reduced to under half (in this test).
|
This patch makes each request (handler) thread have a This idea is good to me :) |
|
Could we use ThreadLocal to keep those thread resources, like BufferSupplier and ActionQueue, to simplify the method arguments? The cost of ThreadLocal is low and it is easy to add new thread local resource in the future (and we don’t need to changes a lot of method arguments) |
|
In my opinion, thread locals are most useful when one doesn't control the code. For cases like this, being explicit makes it easier to reason about and also test. Even if it's a bit more work initially. |
|
@chia7712 One option would be for me to introduce a |
It is great! +1 |
|
@ijuma I have closed #9220 and assign https://issues.apache.org/jira/browse/KAFKA-10433 to you. |
|
@ijuma I had filed a PR to zstd-jni to open the door for reusing byte array of zstd compression (luben/zstd-jni@1346fc1). Also, there is a ticket (https://issues.apache.org/jira/browse/KAFKA-10470) which will apply the new API of zstd-jni. |
…e-allocations-lz4 * apache-github/trunk: (562 commits) MINOR: remove unused code from MessageTest (apache#9961) MINOR: Fix visibility of Log.{unflushedMessages, addSegment} methods (apache#9966) KAFKA-12229: Restore original class loader in integration tests using EmbeddedConnectCluster during shutdown (apache#9942) KAFKA-12190: Fix setting of file permissions on non-POSIX filesystems (apache#9947) MINOR: Remove `toStruct` and `fromStruct` methods from generated protocol classes (apache#9960) MINOR: Fix typo in Utils#toPositive (apache#9943) MINOR: MessageUtil: remove some deadcode (apache#9931) MINOR: Update zstd-jni to 1.4.8-2 (apache#9957) MINOR: Revert assertion in MockProducerTest (apache#9956) MINOR: Optimize assertions in unit tests (apache#9955) MINOR: Tag `RaftEventSimulationTest` as `integration` and tweak it (apache#9925) MINOR: Update to Gradle 6.8.1 (apache#9953) MINOR: A few small group coordinator cleanups (apache#9952) MINOR: Upgrade ducktape to version 0.8.1 (apache#9933) MINOR: fix record time in test shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing (apache#9948) MINOR: Restore interrupt status when closing (apache#9863) KAFKA-10357: Extract setup of repartition topics from Streams partition assignor (apache#9848) KAFKA-12212; Bump Metadata API version to remove `ClusterAuthorizedOperations` fields (KIP-700) (apache#9945) MINOR: log 2min processing summary of StreamThread loop (apache#9941) MINOR: Drop enable.metadata.quorum config (apache#9934) ...
|
@chia7712 One question we have to decide is whether we want to remove the default arguments in this PR or in a separate PR that is purely mechanical (no behavior changes). A lot of tests call the relevant methods, so removing the defaults would cause a lot of test changes. I am leaning towards doing it as a separate PR and maybe after the 2.8 branch is cut (to avoid disrupting other work targeting 2.8). What do you think? |
+1 |
@ijuma Do you lean toward to implement this? It can help us improve "action queue". |
|
@chia7712 I had forgotten that part of the discussion. :) Let me take a closer look at that. |
…e-allocations-lz4 * apache-github/trunk: (118 commits) KAFKA-12327: Remove MethodHandle usage in CompressionType (apache#10123) KAFKA-12297: Make MockProducer return RecordMetadata with values as per contract MINOR: Update zstd and use classes with no finalizers (apache#10120) KAFKA-12326: Corrected regresion in MirrorMaker 2 executable introduced with KAFKA-10021 (apache#10122) KAFKA-12321 the comparison function for uuid type should be 'equals' rather than '==' (apache#10098) MINOR: Add FetchSnapshot API doc in KafkaRaftClient (apache#10097) MINOR: KIP-631 KafkaConfig fixes and improvements (apache#10114) KAFKA-12272: Fix commit-interval metrics (apache#10102) MINOR: Improve confusing admin client shutdown logging (apache#10107) MINOR: Add BrokerMetadataListener (apache#10111) MINOR: Support Raft-based metadata quorums in system tests (apache#10093) MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. (apache#10106) MINOR: Introduce the KIP-500 Broker lifecycle manager (apache#10095) MINOR: Remove always-passing validation in TestRecordTest#testProducerRecord (apache#9930) KAFKA-5235: GetOffsetShell: Support for multiple topics and consumer configuration override (KIP-635) (apache#9430) MINOR: Prevent creating partition.metadata until ID can be written (apache#10041) MINOR: Add RaftReplicaManager (apache#10069) MINOR: Add ClientQuotaMetadataManager for processing QuotaRecord (apache#10101) MINOR: Rename DecommissionBrokers to UnregisterBrokers (apache#10084) MINOR: KafkaBroker.brokerState should be volatile instead of AtomicReference (apache#10080) ... clients/src/main/java/org/apache/kafka/common/record/CompressionType.java core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
|
We already have a |
’RequestLocal’ is good to me |
…e-allocations-lz4 * apache-github/trunk: (243 commits) KAFKA-12590: Remove deprecated kafka.security.auth.Authorizer, SimpleAclAuthorizer and related classes in 3.0 (apache#10450) KAFKA-3968: fsync the parent directory of a segment file when the file is created (apache#10405) KAFKA-12283: disable flaky testMultipleWorkersRejoining to stabilize build (apache#10408) MINOR: remove KTable.to from the docs (apache#10464) MONOR: Remove redudant LocalLogManager (apache#10325) MINOR: support ImplicitLinkedHashCollection#sort (apache#10456) KAFKA-12587 Remove KafkaPrincipal#fromString for 3.0 (apache#10447) KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager (apache#10282) MINOR: Improve reproducability of raft simulation tests (apache#10422) KAFKA-12474: Handle failure to write new session keys gracefully (apache#10396) KAFKA-12593: Fix Apache License headers (apache#10452) MINOR: Fix typo in MirrorMaker v2 documentation (apache#10433) KAFKA-12600: Remove deprecated config value `default` for client config `client.dns.lookup` (apache#10458) KAFKA-12952: Remove deprecated LogConfig.Compact (apache#10451) Initial commit (apache#10454) KAFKA-12575: Eliminate Log.isLogDirOffline boolean attribute (apache#10430) KAFKA-8405; Remove deprecated `kafka-preferred-replica-election` command (apache#10443) MINOR: Fix docs for end-to-end record latency metrics (apache#10449) MINOR Replaced File with Path in LogSegmentData. (apache#10424) KAFKA-12583: Upgrade netty to 4.1.62.Final ...
ab60692 to
3e5b48a
Compare
3e5b48a to
adac142
Compare
|
@chia7712 I introduced
Thoughts? |
That LGTM. For another, the memory utils used by |
@chia7712 Yes, it's worth exploring. I think |
|
@chia7712 I updated the PR. Please review when you have a chance. |
yes, that is an interesting issue to me. I file a JIRA (https://issues.apache.org/jira/browse/KAFKA-12627). That can be a follow-up of this PR. |
| import org.apache.kafka.common.utils.BufferSupplier | ||
|
|
||
| object RequestLocal { | ||
| val NoCaching: RequestLocal = RequestLocal(BufferSupplier.create()) |
There was a problem hiding this comment.
NoCaching should use BufferSupplier.NO_CACHING, right?
| apis.handle(request, requestLocal) | ||
| } catch { | ||
| case e: FatalExitError => | ||
| shutdownComplete.countDown() |
There was a problem hiding this comment.
Should we call completeShutdown() rather than shutdownComplete.countDown()?
|
@chia7712 When you say performance test, you mean the ducktape ones? |
yep. maybe |
…e-allocations-lz4 * apache-github/trunk: (155 commits) KAFKA-12728: Upgrade gradle to 7.0.2 and shadow to 7.0.0 (apache#10606) KAFKA-12778: Fix QuorumController request timeouts and electLeaders (apache#10688) KAFKA-12754: Improve endOffsets for TaskMetadata (apache#10634) Rework on KAFKA-3968: fsync the parent directory of a segment file when the file is created (apache#10680) MINOR: set replication.factor to 1 to make StreamsBrokerCompatibilityService work with old broker (apache#10673) MINOR: prevent cleanup() from being called while Streams is still shutting down (apache#10666) KAFKA-8326: Introduce List Serde (apache#6592) KAFKA-12697: Add Global Topic and Partition count metrics to the Quorum Controller (apache#10679) KAFKA-12648: MINOR - Add TopologyMetadata.Subtopology class for subtopology metadata (apache#10676) MINOR: Update jacoco to 0.8.7 for JDK 16 support (apache#10654) MINOR: exclude all `src/generated` and `src/generated-test` (apache#10671) KAFKA-12772: Move all transaction state transition rules into their states (apache#10667) KAFKA-12758 Added `server-common` module to have server side common classes. (apache#10638) MINOR Removed copying storage libraries specifically as they are already copied. (apache#10647) KAFKA-5876: KIP-216 Part 4, Apply InvalidStateStorePartitionException for Interactive Queries (apache#10657) KAFKA-12747: Fix flakiness in shouldReturnUUIDsWithStringPrefix (apache#10643) MINOR: remove unnecessary placeholder from WorkerSourceTask#recordSent (apache#10659) MINOR: Remove unused `scalatest` definition from `dependencies.gradle` (apache#10655) MINOR: checkstyle version upgrade: 8.20 -> 8.36.2 (apache#10656) KAFKA-12464: minor code cleanup and additional logging in constrained sticky assignment (apache#10645) ...
…e-allocations-lz4 * apache-github/trunk: (43 commits) KAFKA-12800: Configure generator to fail on trailing JSON tokens (apache#10717) MINOR: clarify message ordering with max in-flight requests and idempotent producer (apache#10690) MINOR: Add log identifier/prefix printing in Log layer static functions (apache#10742) MINOR: update java doc for deprecated methods (apache#10722) MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest (apache#10703) KAFKA-12499: add transaction timeout verification (apache#10482) KAFKA-12620 Allocate producer ids on the controller (apache#10504) MINOR: Kafka Streams code samples formating unification (apache#10651) KAFKA-12808: Remove Deprecated Methods under StreamsMetrics (apache#10724) KAFKA-12522: Cast SMT should allow null value records to pass through (apache#10375) KAFKA-12820: Upgrade maven-artifact dependency to resolve CVE-2021-26291 HOTFIX: fix checkstyle issue in KAFKA-12697 KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (apache#10572) KAFKA-12342: Remove MetaLogShim and use RaftClient directly (apache#10705) KAFKA-12779: KIP-740, Clean up public API in TaskId and fix TaskMetadata#taskId() (apache#10735) KAFKA-12814: Remove Deprecated Method StreamsConfig getConsumerConfigs (apache#10737) MINOR: Eliminate redundant functions in LogTest suite (apache#10732) MINOR: Remove unused maxProducerIdExpirationMs parameter in Log constructor (apache#10723) MINOR: Updating files with release 2.7.1 (apache#10660) KAFKA-12809: Remove deprecated methods of Stores factory (apache#10729) ...
|
This PR: trunk: |
|
The results are similar for the ducktape benchmarks since the bottleneck is elsewhere. In the PR description, I include the results for a workload that shows significant improvement with these changes. Also, the following allocation profiles show that the the lz4 buffer allocations dominate trunk and are gone in this PR: So, I think we can go ahead and merge this. |


Use a caching
BufferSupplierper request handler thread so thatdecompression buffers are cached if supported by the underlying
CompressionType. This achieves a similar outcome as #9220, butwith less contention.
We introduce a
RequestLocalclass to make it easier to introducenew request scoped stateful instances (one example we discussed
previously was an
ActionQueuethat could be used to avoidsome of the complex group coordinator locking).
This is a small win for zstd (no synchronization or soft references) and
a more significant win for lz4. In particular, it reduces allocations
significantly when the number of partitions is high. The decompression
buffer size is typically 64 KB, so a produce request with 1000 partitions
results in 64 MB of allocations even if each produce batch is small (likely,
when there are so many partitions).
I did a quick producer perf local test with 5000 partitions, 1 KB record
size,
1 broker, lz4 and ~0.5 for the producer compression rate metric:
Before this change:
After this change:
That's a 25% throughput improvement and p999 latency was reduced to
under half (in this test).
Default arguments will be removed in a subsequent PR.
Committer Checklist (excluded from commit message)