Skip to content

KAFKA-9039: Optimize ReplicaFetcher fetch path#7443

Merged
guozhangwang merged 12 commits intoapache:trunkfrom
lbradstreet:replica-fetcher-performance
Oct 16, 2019
Merged

KAFKA-9039: Optimize ReplicaFetcher fetch path#7443
guozhangwang merged 12 commits intoapache:trunkfrom
lbradstreet:replica-fetcher-performance

Conversation

@lbradstreet
Copy link
Copy Markdown
Contributor

@lbradstreet lbradstreet commented Oct 4, 2019

Improves the performance of the replica fetcher for high partition count fetch requests, where a majority of the partitions did not update between fetch requests. All benchmarks were run on an r5x.large.

Vanilla
Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 26491.825 ± 438.463 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 153941.952 ± 4337.073 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 339868.602 ± 4201.462 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 2588878.448 ± 22172.482 ns/op

From 100 to 5000 partitions the latency increase is 2588878.448 / 26491.825 = 97.

Avoid gettimeofdaycalls in steady state fetch states
8545888

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 22685.381 ± 267.727 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 113622.521 ± 1854.254 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 273698.740 ± 9269.554 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 2189223.207 ± 1706.945 ns/op

From 100 to 5000 partitions the latency increase is 2189223.207 / 22685.381 = 97X

Avoid copying partition states to maintain fetch offsets
29fdd60

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 17039.989 ± 609.355 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 99371.086 ± 1833.256 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 216071.333 ± 3714.147 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 2035678.223 ± 5195.232 ns/op

From 100 to 5000 partitions the latency increase is 2035678.223 / 17039.989 = 119X

Keep lag alongside PartitionFetchState to avoid expensive isReplicaInSync check
0e57e3e

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 15131.684 ± 382.088 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 86813.843 ± 3346.385 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 193050.381 ± 3281.833 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 1801488.513 ± 2756.355 ns/op

From 100 to 5000 partitions the latency increase is 1801488.513 / 15131.684 = 119X

Fetch session optimizations (mostly presizing the next hashmap, and avoiding making a copy of sessionPartitions, as a deep copy is not required for the ReplicaFetcher)
2614b24

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 11386.203 ± 416.701 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 60820.292 ± 3163.001 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 146242.158 ± 1937.254 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 1366768.926 ± 3305.712 ns/op

From 100 to 5000 partitions the latency increase is 1366768.926 / 11386.203 = 120

We already have the partition fetch data in the fetch session, so a copy
is not required.
isReplicaInSync check. Reduce cost of updating map by not unnecessarily
wrapping in ClientIdTopicPartition in FetcherStats
@lbradstreet
Copy link
Copy Markdown
Contributor Author

lbradstreet commented Oct 4, 2019

According to profiling, a majority of the rest of the time goes to looking up the logStartOffset for each partition on each pass (https://github.com/apache/kafka/pull/7443/files#diff-a8437f241a5ae585d5805ee769313080R252), and updating the incremental fetch session (https://github.com/apache/kafka/pull/7443/files#diff-a8437f241a5ae585d5805ee769313080R253). If we can drop those further, we may be able to save another 50-75%.

fetcherLagMetrics.lag <= 0
else
false
stats.getAndMaybePut(topicPartition)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping each put in a ClientIdTopicPartition served no purpose here, as metricId is fixed for every instantiation of FetcherLagStats. This adds additional object creation/GC overhead, as well as additional hashing overhead.

val fetchState = fetchStates(topicPartition)
if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
val fetchPartitionData = sessionPartitions.get(topicPartition)
if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be cleaned up a little.

val partitionsWithError = mutable.Set[TopicPartition]()

val builder = fetchSessionHandler.newBuilder()
val builder = fetchSessionHandler.newBuilder(partitionMap.size, false)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the new builder which presizes the PartitionData hashmap, and which does not make another copy of sessionPartitions. We will not be modifying sessionPartitions, or using it again after we have processed the fetch request, so we do not need to make an unnecessary copy.

.toForget(fetchData.toForget)
.metadata(fetchData.metadata)
Some(requestBuilder)
Some(ReplicaFetch(fetchData.sessionPartitions(), requestBuilder))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By passing through sessionPartitions we can avoid making another copy of the PartitionMap in https://github.com/apache/kafka/pull/7443/files#diff-2d03a5d4349a23e56c09b4e773055249L119. sessionPartitions contains all of the PartitionData that we need to process the fetch response.

val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition)
!isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
!fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid a HashMap lookup here by including the lag in the PartitionFetchState directly.

def isTruncating: Boolean = state == Truncating && !isDelayed

def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0
def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
Copy link
Copy Markdown
Contributor Author

@lbradstreet lbradstreet Oct 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid an unnecessary gettimeofday when partitions are in Fetching state (the common case) by using an Option[DelayedItem].

@guozhangwang
Copy link
Copy Markdown
Contributor

Thanks for the combo-patch @lbradstreet ! I think this is very helpful. A couple of thoughts on top of my head:

  1. in the baseline, from 100 to 5000 partitions the latency increased by about 97X; in the PR that contains all optimizations, although the absolute latency is reduced, from 100 to 5000 the relative ratio is actually 120X, which is a bit weird. Ideally I'd expect we can achieve close to 50X, i.e. linear to num.partitions.

  2. We should probably look into all the INFO level log4j entries to avoid any expensive evaluations, e.g. the one below

info(s"Beginning/resuming copy of partition $tp from offset ${fetchState.fetchOffset}. " +
        s"Including this partition, there are ${partitionMap.size} remaining partitions to copy by this thread.")

partitionMap.size in Scala is O(n), although the passed-in map is a LinkedHashMap whose size call is O(1). Honestly I do not know which one would be triggered, but worth reducing it to be safe.

  1. One bold idea I have is that in the SessionHandler, we can be even smarter, e.g. to not include all the partitions if we found that the previous request only returns data for N partitions out of all, then in the next buildFetch we could just fetch for the next N partitions.

@guozhangwang
Copy link
Copy Markdown
Contributor

For 1) I've just updated the description to include the relative ratio of each optimization, it seems that 29fdd60 makes it less scalable with the num.partitions since it's only useful with small num.partitions.

@guozhangwang
Copy link
Copy Markdown
Contributor

  1. Another wild thought: in doWork we can comment out maybeTruncate and see if that alone is taking up long time compared with the baseline, since in practice maybeTruncate should mostly be no-op, and if itself is taking lots of time we can also make another thorough pass on that function. I can do the benchmark taking from your branch if you think it's a good idea.

@lbradstreet
Copy link
Copy Markdown
Contributor Author

For 1) I've just updated the description to include the relative ratio of each optimization, it seems that 29fdd60 makes it less scalable with the num.partitions since it's only useful with small num.partitions.

Thanks! I'm assuming that you're OK with it still, because it still helps across the board? It just helps less for the high partition count case?

@lbradstreet
Copy link
Copy Markdown
Contributor Author

Is it always better to construct a new map updated than updating next directly?

It's a good question, and I'm not sure. The idea was that by making next smaller we could make the lookups progressively quicker as we process sessionsPartitions. It did seem to improve performance for our benchmark. I forgot to include the second optimization that it allows us to make f0bf23a. This helps reduce containsKey checks, and is worth keeping if we leave the change to keep the second map in. If not we will need to remove it again.

@guozhangwang
Copy link
Copy Markdown
Contributor

Thanks! I'm assuming that you're OK with it still, because it still helps across the board? It just helps less for the high partition count case?

Yeah that's okay with me still. Just trying to point it out that with large number of partitions there seems other dominating factors.

We will get better performance improvements by redesigning how
incremental fetch sessions work in the Replica Fetcher. Minimizing
differences in the fetch session will minimize risks of the changeset.
@lbradstreet
Copy link
Copy Markdown
Contributor Author

@guozhangwang I removed (32e30ac) most of the risky FetchSessionHandler changes that only showed up in the FetchSessionHandlerBenchmark, as I think we should redesign how the Replica Fetcher incremental fetch session is maintained, rather than try and optimize it further.

I re-ran the benchmark on the r5x.large and it shows very little differences vs the previous commit.

Benchmark                                  (partitionCount)  Mode  Cnt        Score      Error  Units
ReplicaFetcherThreadBenchmark.testFetcher               100  avgt   15    11534.716 ±  327.903  ns/op
ReplicaFetcherThreadBenchmark.testFetcher               500  avgt   15    58781.785 ± 2209.587  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt   15   151653.533 ±  400.809  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt   15  1359152.056 ±  912.209  ns/op

@guozhangwang guozhangwang changed the title WIP: improve replica fetcher performance for high partition count clusters KAFKA-9039: improve replica fetcher performance for high partition count clusters Oct 14, 2019
@guozhangwang guozhangwang changed the title KAFKA-9039: improve replica fetcher performance for high partition count clusters KAFKA-9039: Improve replica fetcher performance for high partition count clusters Oct 14, 2019
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I do not have further comments except the fix for scala 2.13 above. I will try to run the benchmark included in your PR on my local MP5 as well to confirm its effects before merging.

* incremental fetch requests (see below).
*/
private LinkedHashMap<TopicPartition, PartitionData> next = new LinkedHashMap<>();
private LinkedHashMap<TopicPartition, PartitionData> next;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we make one step further, to just make the sessionPartitions a local variable only: this is because in later handling logic after the response is sent back, we only need sessionPartitions.keySet / size only, which we could maintain separately as a member field which is populated after the build() call. WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM I realized now you also need the sessionPartitions for saving one fetch state copying as well.

@guozhangwang
Copy link
Copy Markdown
Contributor

I've run the benchmark from this PR on my laptop and here's the results:

Baseline (trunk)

ReplicaFetcherThreadBenchmark.testFetcher               100  avgt   15    19845.007 ±    95.308  ns/op
ReplicaFetcherThreadBenchmark.testFetcher               500  avgt   15   131247.995 ±   777.615  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt   15   290156.189 ±  1590.831  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt   15  2401985.724 ± 45898.838  ns/op

For 100/500/1000/5000 partitions, the cost scale factor (i.e. score per partition) increased from 1 to 1/1.32/1.46/2.42

This PR:

ReplicaFetcherThreadBenchmark.testFetcher               100  avgt   15     9480.761 ±   303.979  ns/op
ReplicaFetcherThreadBenchmark.testFetcher               500  avgt   15    67359.693 ±  4235.383  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt   15   155223.743 ±  2936.436  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt   15  1297303.182 ± 11500.040  ns/op

The latency reduction ranges between 65% to 50%.

Interesting, for 100/500/1000/5000 partitions, the cost scale factor (i.e. score per partition) increased from 1 to 1/1.42/1.64/2.72, which is higher than the baseline. I means the optimizations we've done in this PR benefits the scenario with less partitions more than larger num.partitions.

Overall, I still think we can merge the PR as-is and continue the next optimizations on top of it.

@lbradstreet
Copy link
Copy Markdown
Contributor Author

Is it always better to construct a new map updated than updating next directly?

Resolved by 32e30ac

@lbradstreet
Copy link
Copy Markdown
Contributor Author

Build failures are due to broken trunk (#7521)

@lbradstreet
Copy link
Copy Markdown
Contributor Author

retest this please

fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]()
var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty
var responseData: Map[TopicPartition, FetchData] = Map.empty
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+100

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lbradstreet : Thanks for the PR. Looks good overall. A few comments below.

Comment thread core/src/main/scala/kafka/server/AbstractFetcherThread.scala
val partitionsWithError = mutable.Set[TopicPartition]()

val builder = fetchSessionHandler.newBuilder()
val builder = fetchSessionHandler.newBuilder(partitionMap.size, false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, here, we choose not to copy sessionPartitions in the builder. However, in line 265 and line 273, we still read sessionPartitions. Is that correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is still correct. We do not copy sessionPartitions, but sessionPartitions will also not be modified until the next fetch/doWork iteration, and all use of it is from the same thread. I believe it is thus safe to use it without making a copy.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. That makes sense.

Comment thread core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lbradstreet : Thanks for the latest PR. LGTM

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM. I will merge after the jenkins job completes.

@lbradstreet
Copy link
Copy Markdown
Contributor Author

lbradstreet commented Oct 16, 2019

Failures in:
2.12

kafka.api.SaslSslAdminClientIntegrationTest.testCreateTopicsResponseMetadataAndConfig
kafka.log.LogCleanerIntegrationTest.testIsThreadFailed
org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinIntegrationTest.doLeftJoinFilterOutRapidlyChangingForeignKeyValues
org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinIntegrationTest.doInnerJoinFilterOutRapidlyChangingForeignKeyValues

2.13:

kafka.admin.ReassignPartitionsClusterTest.shouldTriggerReassignmentWithZnodePrecedenceOnControllerStartup
kafka.api.SaslSslAdminClientIntegrationTest.testCreateTopicsResponseMetadataAndConfig

@guozhangwang guozhangwang changed the title KAFKA-9039: Improve replica fetcher performance for high partition count clusters KAFKA-9039: Optimize ReplicaFetcher fetch path Oct 16, 2019
@guozhangwang guozhangwang merged commit 8966d06 into apache:trunk Oct 16, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants