From d2a13d82702e0e0f0b2275b984b4babdbd976ae3 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Tue, 1 Oct 2019 13:18:40 -0700 Subject: [PATCH 01/12] Add replica fetcher benchmark --- checkstyle/import-control-jmh-benchmarks.xml | 1 + .../kafka/server/AbstractFetcherThread.scala | 2 + .../server/ReplicaAlterLogDirsThread.scala | 4 + .../kafka/server/ReplicaFetcherThread.scala | 8 +- .../server/AbstractFetcherThreadTest.scala | 2 + .../ReplicaFetcherThreadBenchmark.java | 314 ++++++++++++++++++ 6 files changed, 329 insertions(+), 2 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index 49bd8c786920d..d19a9b6f60f9f 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -36,6 +36,7 @@ + diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 6e2e5da4c9a5b..1d8f42218ea43 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -83,6 +83,8 @@ abstract class AbstractFetcherThread(name: String, protected def latestEpoch(topicPartition: TopicPartition): Option[Int] + protected def logStartOffset(topicPartition: TopicPartition): Long + protected def logEndOffset(topicPartition: TopicPartition): Long protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index fdb2bfd6c0365..0b9c823774825 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -59,6 +59,10 @@ class ReplicaAlterLogDirsThread(name: String, replicaMgr.futureLocalLogOrException(topicPartition).latestEpoch } + override protected def logStartOffset(topicPartition: TopicPartition): Long = { + replicaMgr.futureLocalLogOrException(topicPartition).logStartOffset + } + override protected def logEndOffset(topicPartition: TopicPartition): Long = { replicaMgr.futureLocalLogOrException(topicPartition).logEndOffset } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 1d2fdeed2d65c..f033b2f797e2c 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -96,12 +96,16 @@ class ReplicaFetcherThread(name: String, private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes private val fetchSize = brokerConfig.replicaFetchMaxBytes private val brokerSupportsLeaderEpochRequest = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2 - private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id) + val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id) override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = { replicaMgr.localLogOrException(topicPartition).latestEpoch } + override protected def logStartOffset(topicPartition: TopicPartition): Long = { + replicaMgr.localLogOrException(topicPartition).logStartOffset + } + override protected def logEndOffset(topicPartition: TopicPartition): Long = { replicaMgr.localLogOrException(topicPartition).logEndOffset } @@ -244,7 +248,7 @@ class ReplicaFetcherThread(name: String, // We will not include a replica in the fetch request if it should be throttled. if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) { try { - val logStartOffset = replicaMgr.localLogOrException(topicPartition).logStartOffset + val logStartOffset = this.logStartOffset(topicPartition) builder.add(topicPartition, new FetchRequest.PartitionData( fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch))) } catch { diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 55c38a1deab80..13534ebd709e3 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -919,6 +919,8 @@ class AbstractFetcherThreadTest { state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH)) } + override def logStartOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logStartOffset + override def logEndOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logEndOffset override def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java new file mode 100644 index 0000000000000..b82785f746f31 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.fetcher; + +import kafka.api.ApiVersion$; +import kafka.cluster.BrokerEndPoint; +import kafka.cluster.DelayedOperations; +import kafka.cluster.Partition; +import kafka.cluster.PartitionStateStore; +import kafka.log.CleanerConfig; +import kafka.log.Defaults; +import kafka.log.LogAppendInfo; +import kafka.log.LogConfig; +import kafka.log.LogManager; +import kafka.server.BrokerState; +import kafka.server.BrokerTopicStats; +import kafka.server.FailedPartitions; +import kafka.server.KafkaConfig; +import kafka.server.LogDirFailureChannel; +import kafka.server.MetadataCache; +import kafka.server.OffsetAndEpoch; +import kafka.server.OffsetTruncationState; +import kafka.server.ReplicaFetcherThread; +import kafka.server.ReplicaQuota; +import kafka.server.checkpoints.OffsetCheckpoints; +import kafka.utils.KafkaScheduler; +import kafka.utils.Pool; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.LeaderAndIsrRequestData; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.RecordsSend; +import org.apache.kafka.common.requests.EpochEndOffset; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.mockito.Mockito; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import scala.Option; +import scala.Tuple2; +import scala.collection.Iterator; +import scala.collection.JavaConverters; +import scala.compat.java8.OptionConverters; +import scala.collection.Map; +import scala.collection.Seq; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 15) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) + +public class ReplicaFetcherThreadBenchmark { + @Param({"100", "500", "1000", "5000"}) + private int partitionCount; + + private ReplicaFetcherBenchThread fetcher; + private LogManager logManager; + private File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()); + private KafkaScheduler scheduler = new KafkaScheduler(1, "scheduler", true); + private Pool pool = new Pool(Option.empty()); + + @Setup(Level.Trial) + public void setup() throws IOException { + if (!logDir.mkdir()) + throw new IOException("error creating test directory"); + + scheduler.startup(); + Properties props = new Properties(); + props.put("zookeeper.connect", "127.0.0.1:9999"); + KafkaConfig config = new KafkaConfig(props); + LogConfig logConfig = createLogConfig(); + + List logDirs = Collections.singletonList(logDir); + BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); + LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class); + logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(), + JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala().toSeq(), + new scala.collection.mutable.HashMap<>(), + logConfig, + new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"), + 1, + 1000L, + 10000L, + 10000L, + 1000L, + 60000, + scheduler, + new BrokerState(), + brokerTopicStats, + logDirFailureChannel, + Time.SYSTEM); + + LinkedHashMap> initialFetched = new LinkedHashMap<>(); + scala.collection.mutable.Map offsetAndEpochs = new scala.collection.mutable.HashMap<>(); + for (int i = 0; i < partitionCount; i++) { + TopicPartition tp = new TopicPartition("topic", i); + + List replicas = Arrays.asList(0, 1, 2); + LeaderAndIsrRequestData.LeaderAndIsrPartitionState partitionState = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState() + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(replicas) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true); + + PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class); + Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties()); + OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class); + Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L)); + Partition partition = new Partition(tp, 100, ApiVersion$.MODULE$.latestVersion(), + 0, Time.SYSTEM, partitionStateStore, new DelayedOperationsMock(tp), + Mockito.mock(MetadataCache.class), logManager); + + partition.makeFollower(0, partitionState, 0, offsetCheckpoints); + pool.put(tp, partition); + offsetAndEpochs.put(tp, new OffsetAndEpoch(0, 0)); + BaseRecords fetched = new BaseRecords() { + @Override + public int sizeInBytes() { + return 0; + } + + @Override + public RecordsSend toSend(String destination) { + return null; + } + }; + initialFetched.put(tp, new FetchResponse.PartitionData<>(Errors.NONE, 0, 0, 0, + new LinkedList<>(), fetched)); + } + + fetcher = new ReplicaFetcherBenchThread(config, pool); + fetcher.addPartitions(offsetAndEpochs); + // force a pass to move partitions to fetching state. We do this in the setup phase + // so that we do not measure this time as part of the steady state work + fetcher.doWork(); + // handle response to engage the incremental fetch session handler + fetcher.fetchSessionHandler().handleResponse(new FetchResponse<>(Errors.NONE, initialFetched, 0, 999)); + } + + @TearDown(Level.Trial) + public void tearDown() throws IOException { + logManager.shutdown(); + scheduler.shutdown(); + Utils.delete(logDir); + } + + @Benchmark + public long testFetcher() { + fetcher.doWork(); + return fetcher.fetcherStats().requestRate().count(); + } + + // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results + private static class DelayedOperationsMock extends DelayedOperations { + DelayedOperationsMock(TopicPartition topicPartition) { + super(topicPartition, null, null, null); + } + + @Override + public int numDelayedDelete() { + return 0; + } + } + + private static LogConfig createLogConfig() { + Properties logProps = new Properties(); + logProps.put(LogConfig.SegmentMsProp(), Defaults.SegmentMs()); + logProps.put(LogConfig.SegmentBytesProp(), Defaults.SegmentSize()); + logProps.put(LogConfig.RetentionMsProp(), Defaults.RetentionMs()); + logProps.put(LogConfig.RetentionBytesProp(), Defaults.RetentionSize()); + logProps.put(LogConfig.SegmentJitterMsProp(), Defaults.SegmentJitterMs()); + logProps.put(LogConfig.CleanupPolicyProp(), Defaults.CleanupPolicy()); + logProps.put(LogConfig.MaxMessageBytesProp(), Defaults.MaxMessageSize()); + logProps.put(LogConfig.IndexIntervalBytesProp(), Defaults.IndexInterval()); + logProps.put(LogConfig.SegmentIndexBytesProp(), Defaults.MaxIndexSize()); + logProps.put(LogConfig.MessageFormatVersionProp(), Defaults.MessageFormatVersion()); + logProps.put(LogConfig.FileDeleteDelayMsProp(), Defaults.FileDeleteDelayMs()); + return LogConfig.apply(logProps, new scala.collection.immutable.HashSet<>()); + } + + + static class ReplicaFetcherBenchThread extends ReplicaFetcherThread { + private final Pool pool; + + ReplicaFetcherBenchThread(KafkaConfig config, Pool partitions) { + super("name", + 3, + new BrokerEndPoint(3, "host", 3000), + config, + new FailedPartitions(), + null, + new Metrics(), + Time.SYSTEM, + new ReplicaQuota() { + @Override + public boolean isQuotaExceeded() { + return false; + } + + @Override + public void record(long value) { + } + + @Override + public boolean isThrottled(TopicPartition topicPartition) { + return false; + } + }, + Option.empty()); + + pool = partitions; + } + + @Override + public Option latestEpoch(TopicPartition topicPartition) { + return Option.apply(0); + } + + @Override + public long logStartOffset(TopicPartition topicPartition) { + return pool.get(topicPartition).localLogOrException().logStartOffset(); + } + + @Override + public long logEndOffset(TopicPartition topicPartition) { + return 0; + } + + @Override + public void truncate(TopicPartition tp, OffsetTruncationState offsetTruncationState) { + // pretend to truncate to move to Fetching state + } + + @Override + public Option endOffsetForEpoch(TopicPartition topicPartition, int epoch) { + return OptionConverters.toScala(Optional.of(new OffsetAndEpoch(0, 0))); + } + + @Override + public Option processPartitionData(TopicPartition topicPartition, long fetchOffset, FetchResponse.PartitionData partitionData) { + return Option.empty(); + } + + @Override + public long fetchEarliestOffsetFromLeader(TopicPartition topicPartition, int currentLeaderEpoch) { + return 0; + } + + @Override + public Map fetchEpochEndOffsets(Map partitions) { + scala.collection.mutable.Map endOffsets = new scala.collection.mutable.HashMap<>(); + Iterator iterator = partitions.keys().iterator(); + while (iterator.hasNext()) { + endOffsets.put(iterator.next(), new EpochEndOffset(0, 100)); + } + return endOffsets; + } + + @Override + public Seq>> fetchFromLeader(FetchRequest.Builder fetchRequest) { + return JavaConverters.asScalaIteratorConverter(new ArrayList>>().iterator()).asScala().toSeq(); + } + } +} From 854588804d7cf96a10d6e4a74db533deb7e26b55 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Tue, 1 Oct 2019 13:46:30 -0700 Subject: [PATCH 02/12] Avoid gettimeofdaycalls in steady state fetch states --- .../scala/kafka/server/AbstractFetcherThread.scala | 12 ++++++------ .../kafka/server/ReplicaAlterLogDirsThreadTest.scala | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 1d8f42218ea43..c75c1a8602555 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -31,7 +31,7 @@ import kafka.utils.CoreUtils.inLock import org.apache.kafka.common.protocol.Errors import AbstractFetcherThread._ -import scala.collection.{Map, Seq, Set, mutable} +import scala.collection.{mutable, Map, Seq, Set} import scala.collection.JavaConverters._ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong @@ -615,7 +615,7 @@ abstract class AbstractFetcherThread(name: String, Option(partitionStates.stateValue(partition)).foreach { currentFetchState => if (!currentFetchState.isDelayed) { partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset, - currentFetchState.currentLeaderEpoch, new DelayedItem(delay), currentFetchState.state)) + currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state)) } } } @@ -757,7 +757,7 @@ case object Fetching extends ReplicaState object PartitionFetchState { def apply(offset: Long, currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = { - PartitionFetchState(offset, currentLeaderEpoch, new DelayedItem(0), state) + PartitionFetchState(offset, currentLeaderEpoch, None, state) } } @@ -771,20 +771,20 @@ object PartitionFetchState { */ case class PartitionFetchState(fetchOffset: Long, currentLeaderEpoch: Int, - delay: DelayedItem, + delay: Option[DelayedItem], state: ReplicaState) { def isReadyForFetch: Boolean = state == Fetching && !isDelayed def isTruncating: Boolean = state == Truncating && !isDelayed - def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0 + def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0) override def toString: String = { s"FetchState(fetchOffset=$fetchOffset" + s", currentLeaderEpoch=$currentLeaderEpoch" + s", state=$state" + - s", delay=${delay.delayMs}ms" + + s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" + s")" } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 79457c0ee825c..33b0ae394046e 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -587,7 +587,7 @@ class ReplicaAlterLogDirsThreadTest { // one partition is ready and one is delayed val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = thread.buildFetch(Map( t1p0 -> PartitionFetchState(140, leaderEpoch, state = Fetching), - t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching))) + t1p1 -> PartitionFetchState(160, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching))) assertTrue(fetchRequest2Opt.isDefined) val fetchRequest2 = fetchRequest2Opt.get @@ -600,8 +600,8 @@ class ReplicaAlterLogDirsThreadTest { // both partitions are delayed val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = thread.buildFetch(Map( - t1p0 -> PartitionFetchState(140, leaderEpoch, delay = new DelayedItem(5000), state = Fetching), - t1p1 -> PartitionFetchState(160, leaderEpoch, delay = new DelayedItem(5000), state = Fetching))) + t1p0 -> PartitionFetchState(140, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching), + t1p1 -> PartitionFetchState(160, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching))) assertTrue("Expected no fetch requests since all partitions are delayed", fetchRequest3Opt.isEmpty) assertFalse(partitionsWithError3.nonEmpty) } From 29fdd6094b828153c762f1d99b645481f7200cee Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Tue, 1 Oct 2019 15:41:36 -0700 Subject: [PATCH 03/12] Avoid copying partition states to maintain fetch offsets We already have the partition fetch data in the fetch session, so a copy is not required. --- .../common/internals/PartitionStates.java | 2 +- .../kafka/server/AbstractFetcherThread.scala | 41 ++++++++++--------- .../server/ReplicaAlterLogDirsThread.scala | 11 ++--- .../kafka/server/ReplicaFetcherThread.scala | 7 ++-- .../server/AbstractFetcherThreadTest.scala | 7 ++-- .../ReplicaAlterLogDirsThreadTest.scala | 11 ++--- 6 files changed, 42 insertions(+), 37 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java index daad3550738b0..c289eaa59b719 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java @@ -101,7 +101,7 @@ public Stream> stream() { } public LinkedHashMap partitionStateMap() { - return new LinkedHashMap<>(map); + return map; } /** diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index c75c1a8602555..dc20019c5d297 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -18,6 +18,7 @@ package kafka.server import java.nio.ByteBuffer +import java.util import java.util.Optional import java.util.concurrent.locks.ReentrantLock @@ -29,16 +30,17 @@ import kafka.common.ClientIdAndBroker import kafka.metrics.KafkaMetricsGroup import kafka.utils.CoreUtils.inLock import org.apache.kafka.common.protocol.Errors -import AbstractFetcherThread._ import scala.collection.{mutable, Map, Seq, Set} import scala.collection.JavaConverters._ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong -import java.util.function.Consumer +import java.util.function.BiConsumer import com.yammer.metrics.core.Gauge import kafka.log.LogAppendInfo +import kafka.server.AbstractFetcherThread.Fetch +import kafka.server.AbstractFetcherThread.ResultWithPartitions import org.apache.kafka.common.{InvalidRecordException, TopicPartition} import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} @@ -79,7 +81,8 @@ abstract class AbstractFetcherThread(name: String, protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit - protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] + protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] + protected def latestEpoch(topicPartition: TopicPartition): Option[Int] @@ -117,9 +120,8 @@ abstract class AbstractFetcherThread(name: String, } private def maybeFetch(): Unit = { - val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) { - val fetchStates = partitionStates.partitionStateMap.asScala - val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates) + val fetchRequestOpt = inLock(partitionMapLock) { + val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala) handlePartitionsWithErrors(partitionsWithError, "maybeFetch") @@ -128,11 +130,11 @@ abstract class AbstractFetcherThread(name: String, partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } - (fetchStates, fetchRequestOpt) + fetchRequestOpt } - fetchRequestOpt.foreach { fetchRequest => - processFetchRequest(fetchStates, fetchRequest) + fetchRequestOpt.foreach { case Fetch(sessionPartitions, fetchRequest) => + processFetchRequest(sessionPartitions, fetchRequest) } } @@ -152,13 +154,12 @@ abstract class AbstractFetcherThread(name: String, val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData] val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition] - partitionStates.stream().forEach(new Consumer[PartitionStates.PartitionState[PartitionFetchState]] { - override def accept(state: PartitionStates.PartitionState[PartitionFetchState]): Unit = { - if (state.value.isTruncating) { - val tp = state.topicPartition + partitionStates.partitionStateMap.forEach(new BiConsumer[TopicPartition, PartitionFetchState] { + override def accept(tp: TopicPartition, state: PartitionFetchState): Unit = { + if (state.isTruncating) { latestEpoch(tp) match { case Some(epoch) if isOffsetForLeaderEpochSupported => - partitionsWithEpochs += tp -> new EpochData(Optional.of(state.value.currentLeaderEpoch), epoch) + partitionsWithEpochs += tp -> new EpochData(Optional.of(state.currentLeaderEpoch), epoch) case _ => partitionsWithoutEpochs += tp } @@ -278,7 +279,7 @@ abstract class AbstractFetcherThread(name: String, } } - private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState], + private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder): Unit = { val partitionsWithError = mutable.Set[TopicPartition]() var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty @@ -309,8 +310,8 @@ abstract class AbstractFetcherThread(name: String, // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request. // In this case, we only want to process the fetch response if the partition state is ready for fetch and // the current offset is the same as the offset requested. - val fetchState = fetchStates(topicPartition) - if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) { + val fetchPartitionData = sessionPartitions.get(topicPartition) + if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) { partitionData.error match { case Errors.NONE => try { @@ -326,8 +327,7 @@ abstract class AbstractFetcherThread(name: String, // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if (validBytes > 0 && partitionStates.contains(topicPartition)) { // Update partitionStates only if there is no exception during processPartitionData - val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch, - state = Fetching) + val newFetchState = PartitionFetchState(nextOffset, fetchPartitionData.currentLeaderEpoch.get(), state = Fetching) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) fetcherStats.byteRate.mark(validBytes) } @@ -358,7 +358,7 @@ abstract class AbstractFetcherThread(name: String, case Errors.UNKNOWN_LEADER_EPOCH => debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " + - s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.") + s"this replica's current leader epoch of ${fetchPartitionData.currentLeaderEpoch}.") partitionsWithError += topicPartition case Errors.FENCED_LEADER_EPOCH => @@ -667,6 +667,7 @@ abstract class AbstractFetcherThread(name: String, object AbstractFetcherThread { + case class Fetch(partitionData: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder) case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition]) } diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 0b9c823774825..48f736a3a0f16 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -23,6 +23,7 @@ import java.util.Optional import kafka.api.Request import kafka.cluster.BrokerEndPoint import kafka.log.LogAppendInfo +import kafka.server.AbstractFetcherThread.Fetch import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.QuotaFactory.UnboundedQuota import org.apache.kafka.common.TopicPartition @@ -34,7 +35,7 @@ import org.apache.kafka.common.requests.FetchResponse.PartitionData import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse} import scala.collection.JavaConverters._ -import scala.collection.{Map, Seq, Set, mutable} +import scala.collection.{mutable, Map, Seq, Set} class ReplicaAlterLogDirsThread(name: String, sourceBroker: BrokerEndPoint, @@ -222,7 +223,7 @@ class ReplicaAlterLogDirsThread(name: String, nextPartitionOpt } - private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = { + private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[Fetch]] = { val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] val partitionsWithError = mutable.Set[TopicPartition]() @@ -241,14 +242,14 @@ class ReplicaAlterLogDirsThread(name: String, } else { // Set maxWait and minBytes to 0 because the response should return immediately if // the future log has caught up with the current log of the partition - Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap) - .setMaxBytes(maxBytes)) + val requestBuilder = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes) + Some(Fetch(requestMap, requestBuilder)) } ResultWithPartitions(fetchRequestOpt, partitionsWithError) } - def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = { + def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = { // Only include replica in the fetch request if it is not throttled. if (quota.isQuotaExceeded) { ResultWithPartitions(None, Set.empty) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index f033b2f797e2c..21922427dd386 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -22,6 +22,7 @@ import java.util.Optional import kafka.api._ import kafka.cluster.BrokerEndPoint import kafka.log.LogAppendInfo +import kafka.server.AbstractFetcherThread.Fetch import kafka.server.AbstractFetcherThread.ResultWithPartitions import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.TopicPartition @@ -34,7 +35,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.{LogContext, Time} import scala.collection.JavaConverters._ -import scala.collection.{Map, mutable} +import scala.collection.{mutable, Map} class ReplicaFetcherThread(name: String, fetcherId: Int, @@ -240,7 +241,7 @@ class ReplicaFetcherThread(name: String, } } - override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = { + override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = { val partitionsWithError = mutable.Set[TopicPartition]() val builder = fetchSessionHandler.newBuilder() @@ -269,7 +270,7 @@ class ReplicaFetcherThread(name: String, .setMaxBytes(maxBytes) .toForget(fetchData.toForget) .metadata(fetchData.metadata) - Some(requestBuilder) + Some(Fetch(fetchData.sessionPartitions(), requestBuilder)) } ResultWithPartitions(fetchRequestOpt, partitionsWithError) diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index 13534ebd709e3..f37e201850e2c 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -25,6 +25,7 @@ import com.yammer.metrics.Metrics import kafka.cluster.BrokerEndPoint import kafka.log.LogAppendInfo import kafka.message.NoCompressionCodec +import kafka.server.AbstractFetcherThread.Fetch import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.utils.TestUtils import org.apache.kafka.common.KafkaException @@ -38,7 +39,7 @@ import org.junit.Assert._ import org.junit.{Before, Test} import scala.collection.JavaConverters._ -import scala.collection.{Map, Set, mutable} +import scala.collection.{mutable, Map, Set} import scala.util.Random import org.scalatest.Assertions.assertThrows @@ -901,7 +902,7 @@ class AbstractFetcherThreadTest { state.highWatermark = offset } - override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = { + override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = { val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData] partitionMap.foreach { case (partition, state) => if (state.isReadyForFetch) { @@ -911,7 +912,7 @@ class AbstractFetcherThreadTest { } } val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava) - ResultWithPartitions(Some(fetchRequest), Set.empty) + ResultWithPartitions(Some(Fetch(fetchData.asJava, fetchRequest)), Set.empty) } override def latestEpoch(topicPartition: TopicPartition): Option[Int] = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 33b0ae394046e..1098267c7ed63 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -20,6 +20,7 @@ import java.util.Optional import kafka.cluster.{BrokerEndPoint, Partition} import kafka.log.{Log, LogManager} +import kafka.server.AbstractFetcherThread.Fetch import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.utils.{DelayedItem, TestUtils} import org.apache.kafka.common.TopicPartition @@ -524,7 +525,7 @@ class ReplicaAlterLogDirsThreadTest { t1p1 -> PartitionFetchState(160, leaderEpoch, state = Fetching))) assertTrue(fetchRequestOpt.isDefined) - val fetchRequest = fetchRequestOpt.get + val fetchRequest = fetchRequestOpt.get.fetchRequest assertFalse(fetchRequest.fetchData.isEmpty) assertFalse(partitionsWithError.nonEmpty) val request = fetchRequest.build() @@ -577,9 +578,9 @@ class ReplicaAlterLogDirsThreadTest { assertTrue(fetchRequestOpt.isDefined) val fetchRequest = fetchRequestOpt.get - assertFalse(fetchRequest.fetchData.isEmpty) + assertFalse(fetchRequest.partitionData.isEmpty) assertFalse(partitionsWithError.nonEmpty) - val fetchInfos = fetchRequest.build().fetchData.asScala.toSeq + val fetchInfos = fetchRequest.fetchRequest.build().fetchData.asScala.toSeq assertEquals(1, fetchInfos.length) assertEquals("Expected fetch request for non-truncating partition", t1p0, fetchInfos.head._1) assertEquals(150, fetchInfos.head._2.fetchOffset) @@ -591,9 +592,9 @@ class ReplicaAlterLogDirsThreadTest { assertTrue(fetchRequest2Opt.isDefined) val fetchRequest2 = fetchRequest2Opt.get - assertFalse(fetchRequest2.fetchData.isEmpty) + assertFalse(fetchRequest2.partitionData.isEmpty) assertFalse(partitionsWithError2.nonEmpty) - val fetchInfos2 = fetchRequest2.build().fetchData.asScala.toSeq + val fetchInfos2 = fetchRequest2.fetchRequest.build().fetchData.asScala.toSeq assertEquals(1, fetchInfos2.length) assertEquals("Expected fetch request for non-delayed partition", t1p0, fetchInfos2.head._1) assertEquals(140, fetchInfos2.head._2.fetchOffset) From 0e57e3e725ff70ad8845681b02873229c3c64544 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Tue, 1 Oct 2019 16:18:20 -0700 Subject: [PATCH 04/12] Keep lag alongside PartitionFetchState to avoid expensive isReplicaInSync check. Reduce cost of updating map by not unnecessarily wrapping in ClientIdTopicPartition in FetcherStats --- .../kafka/server/AbstractFetcherThread.scala | 53 +++++++++---------- .../server/ReplicaAlterLogDirsThread.scala | 8 +-- .../kafka/server/ReplicaFetcherThread.scala | 13 +++-- .../server/AbstractFetcherManagerTest.scala | 2 +- .../server/AbstractFetcherThreadTest.scala | 6 +-- .../ReplicaAlterLogDirsThreadTest.scala | 18 +++---- 6 files changed, 47 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index dc20019c5d297..97ada1d16a415 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -39,7 +39,7 @@ import java.util.function.BiConsumer import com.yammer.metrics.core.Gauge import kafka.log.LogAppendInfo -import kafka.server.AbstractFetcherThread.Fetch +import kafka.server.AbstractFetcherThread.ReplicaFetch import kafka.server.AbstractFetcherThread.ResultWithPartitions import org.apache.kafka.common.{InvalidRecordException, TopicPartition} import org.apache.kafka.common.internals.PartitionStates @@ -81,8 +81,7 @@ abstract class AbstractFetcherThread(name: String, protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit - protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] - + protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] protected def latestEpoch(topicPartition: TopicPartition): Option[Int] @@ -133,7 +132,7 @@ abstract class AbstractFetcherThread(name: String, fetchRequestOpt } - fetchRequestOpt.foreach { case Fetch(sessionPartitions, fetchRequest) => + fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) => processFetchRequest(sessionPartitions, fetchRequest) } } @@ -322,12 +321,13 @@ abstract class AbstractFetcherThread(name: String, logAppendInfoOpt.foreach { logAppendInfo => val validBytes = logAppendInfo.validBytes val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset - fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset) + val lag = Math.max(0L, partitionData.highWatermark - nextOffset) + fetcherLagStats.getAndMaybePut(topicPartition).lag = lag // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if (validBytes > 0 && partitionStates.contains(topicPartition)) { // Update partitionStates only if there is no exception during processPartitionData - val newFetchState = PartitionFetchState(nextOffset, fetchPartitionData.currentLeaderEpoch.get(), state = Fetching) + val newFetchState = PartitionFetchState(nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) fetcherStats.byteRate.mark(validBytes) } @@ -390,7 +390,7 @@ abstract class AbstractFetcherThread(name: String, try { Option(partitionStates.stateValue(topicPartition)).foreach { state => val newState = PartitionFetchState(math.min(truncationOffset, state.fetchOffset), - state.currentLeaderEpoch, state.delay, state = Truncating) + None, state.currentLeaderEpoch, state.delay, state = Truncating) partitionStates.updateAndMoveToEnd(topicPartition, newState) partitionMapCond.signalAll() } @@ -420,7 +420,7 @@ abstract class AbstractFetcherThread(name: String, fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch) else initialFetchState.offset - PartitionFetchState(initialFetchOffset, initialFetchState.leaderEpoch, state = Truncating) + PartitionFetchState(initialFetchOffset, None, initialFetchState.leaderEpoch, state = Truncating) } partitionStates.updateAndMoveToEnd(tp, updatedState) } @@ -442,8 +442,8 @@ abstract class AbstractFetcherThread(name: String, val maybeTruncationComplete = fetchOffsets.get(state.topicPartition) match { case Some(offsetTruncationState) => val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating - PartitionFetchState(offsetTruncationState.offset, currentFetchState.currentLeaderEpoch, - currentFetchState.delay, state) + PartitionFetchState(offsetTruncationState.offset, None, + currentFetchState.currentLeaderEpoch, currentFetchState.delay, state) case None => currentFetchState } (state.topicPartition, maybeTruncationComplete) @@ -531,7 +531,7 @@ abstract class AbstractFetcherThread(name: String, fetchState: PartitionFetchState): Boolean = { try { val newOffset = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch) - val newFetchState = PartitionFetchState(newOffset, fetchState.currentLeaderEpoch, state = Fetching) + val newFetchState = PartitionFetchState(newOffset, None, fetchState.currentLeaderEpoch, state = Fetching) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset") @@ -615,7 +615,7 @@ abstract class AbstractFetcherThread(name: String, Option(partitionStates.stateValue(partition)).foreach { currentFetchState => if (!currentFetchState.isDelayed) { partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset, - currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state)) + None, currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state)) } } } @@ -667,7 +667,7 @@ abstract class AbstractFetcherThread(name: String, object AbstractFetcherThread { - case class Fetch(partitionData: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder) + case class ReplicaFetch(partitionData: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder) case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition]) } @@ -705,29 +705,21 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGr } class FetcherLagStats(metricId: ClientIdAndBroker) { - private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k) - val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory)) + private val valueFactory = (k: TopicPartition) => new FetcherLagMetrics(ClientIdTopicPartition(metricId.clientId, k)) + val stats = new Pool[TopicPartition, FetcherLagMetrics](Some(valueFactory)) def getAndMaybePut(topicPartition: TopicPartition): FetcherLagMetrics = { - stats.getAndMaybePut(ClientIdTopicPartition(metricId.clientId, topicPartition)) - } - - def isReplicaInSync(topicPartition: TopicPartition): Boolean = { - val fetcherLagMetrics = stats.get(ClientIdTopicPartition(metricId.clientId, topicPartition)) - if (fetcherLagMetrics != null) - fetcherLagMetrics.lag <= 0 - else - false + stats.getAndMaybePut(topicPartition) } def unregister(topicPartition: TopicPartition): Unit = { - val lagMetrics = stats.remove(ClientIdTopicPartition(metricId.clientId, topicPartition)) + val lagMetrics = stats.remove(topicPartition) if (lagMetrics != null) lagMetrics.unregister() } def unregister(): Unit = { - stats.keys.toBuffer.foreach { key: ClientIdTopicPartition => - unregister(key.topicPartition) + stats.keys.toBuffer.foreach { key: TopicPartition => + unregister(key) } } } @@ -757,8 +749,8 @@ case object Truncating extends ReplicaState case object Fetching extends ReplicaState object PartitionFetchState { - def apply(offset: Long, currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = { - PartitionFetchState(offset, currentLeaderEpoch, None, state) + def apply(offset: Long, lag: Option[Long], currentLeaderEpoch: Int, state: ReplicaState): PartitionFetchState = { + PartitionFetchState(offset, lag, currentLeaderEpoch, None, state) } } @@ -771,12 +763,15 @@ object PartitionFetchState { * (3) ReadyForFetch, the is the active state where the thread is actively fetching data. */ case class PartitionFetchState(fetchOffset: Long, + lag: Option[Long], currentLeaderEpoch: Int, delay: Option[DelayedItem], state: ReplicaState) { def isReadyForFetch: Boolean = state == Fetching && !isDelayed + def isReplicaInSync: Boolean = lag.isDefined && lag.get <= 0 + def isTruncating: Boolean = state == Truncating && !isDelayed def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 48f736a3a0f16..3b47b79626f08 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -23,7 +23,7 @@ import java.util.Optional import kafka.api.Request import kafka.cluster.BrokerEndPoint import kafka.log.LogAppendInfo -import kafka.server.AbstractFetcherThread.Fetch +import kafka.server.AbstractFetcherThread.ReplicaFetch import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.QuotaFactory.UnboundedQuota import org.apache.kafka.common.TopicPartition @@ -223,7 +223,7 @@ class ReplicaAlterLogDirsThread(name: String, nextPartitionOpt } - private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[Fetch]] = { + private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = { val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] val partitionsWithError = mutable.Set[TopicPartition]() @@ -243,13 +243,13 @@ class ReplicaAlterLogDirsThread(name: String, // Set maxWait and minBytes to 0 because the response should return immediately if // the future log has caught up with the current log of the partition val requestBuilder = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes) - Some(Fetch(requestMap, requestBuilder)) + Some(ReplicaFetch(requestMap, requestBuilder)) } ResultWithPartitions(fetchRequestOpt, partitionsWithError) } - def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = { + def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { // Only include replica in the fetch request if it is not throttled. if (quota.isQuotaExceeded) { ResultWithPartitions(None, Set.empty) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 21922427dd386..310df756a07f7 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -22,7 +22,7 @@ import java.util.Optional import kafka.api._ import kafka.cluster.BrokerEndPoint import kafka.log.LogAppendInfo -import kafka.server.AbstractFetcherThread.Fetch +import kafka.server.AbstractFetcherThread.ReplicaFetch import kafka.server.AbstractFetcherThread.ResultWithPartitions import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.TopicPartition @@ -241,13 +241,13 @@ class ReplicaFetcherThread(name: String, } } - override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = { + override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { val partitionsWithError = mutable.Set[TopicPartition]() val builder = fetchSessionHandler.newBuilder() partitionMap.foreach { case (topicPartition, fetchState) => // We will not include a replica in the fetch request if it should be throttled. - if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) { + if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) { try { val logStartOffset = this.logStartOffset(topicPartition) builder.add(topicPartition, new FetchRequest.PartitionData( @@ -270,7 +270,7 @@ class ReplicaFetcherThread(name: String, .setMaxBytes(maxBytes) .toForget(fetchData.toForget) .metadata(fetchData.metadata) - Some(Fetch(fetchData.sessionPartitions(), requestBuilder)) + Some(ReplicaFetch(fetchData.sessionPartitions(), requestBuilder)) } ResultWithPartitions(fetchRequestOpt, partitionsWithError) @@ -335,9 +335,8 @@ class ReplicaFetcherThread(name: String, * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, * the quota is exceeded and the replica is not in sync. */ - private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicPartition): Boolean = { - val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition) - !isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded + private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { + !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } } diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala index 15ce97133809d..26eada76ba882 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala @@ -59,7 +59,7 @@ class AbstractFetcherManagerTest { EasyMock.expect(fetcher.start()) EasyMock.expect(fetcher.addPartitions(Map(tp -> OffsetAndEpoch(fetchOffset, leaderEpoch)))) EasyMock.expect(fetcher.fetchState(tp)) - .andReturn(Some(PartitionFetchState(fetchOffset, leaderEpoch, Truncating))) + .andReturn(Some(PartitionFetchState(fetchOffset, None, leaderEpoch, Truncating))) EasyMock.expect(fetcher.removePartitions(Set(tp))) EasyMock.expect(fetcher.fetchState(tp)).andReturn(None) EasyMock.replay(fetcher) diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index f37e201850e2c..af280ab4d45e3 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -25,7 +25,7 @@ import com.yammer.metrics.Metrics import kafka.cluster.BrokerEndPoint import kafka.log.LogAppendInfo import kafka.message.NoCompressionCodec -import kafka.server.AbstractFetcherThread.Fetch +import kafka.server.AbstractFetcherThread.ReplicaFetch import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.utils.TestUtils import org.apache.kafka.common.KafkaException @@ -902,7 +902,7 @@ class AbstractFetcherThreadTest { state.highWatermark = offset } - override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[Fetch]] = { + override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData] partitionMap.foreach { case (partition, state) => if (state.isReadyForFetch) { @@ -912,7 +912,7 @@ class AbstractFetcherThreadTest { } } val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava) - ResultWithPartitions(Some(Fetch(fetchData.asJava, fetchRequest)), Set.empty) + ResultWithPartitions(Some(ReplicaFetch(fetchData.asJava, fetchRequest)), Set.empty) } override def latestEpoch(topicPartition: TopicPartition): Option[Int] = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 1098267c7ed63..6a7d8c80d4e32 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -20,7 +20,7 @@ import java.util.Optional import kafka.cluster.{BrokerEndPoint, Partition} import kafka.log.{Log, LogManager} -import kafka.server.AbstractFetcherThread.Fetch +import kafka.server.AbstractFetcherThread.ReplicaFetch import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.utils.{DelayedItem, TestUtils} import org.apache.kafka.common.TopicPartition @@ -521,8 +521,8 @@ class ReplicaAlterLogDirsThreadTest { t1p1 -> offsetAndEpoch(0L, leaderEpoch))) val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = thread.buildFetch(Map( - t1p0 -> PartitionFetchState(150, leaderEpoch, state = Fetching), - t1p1 -> PartitionFetchState(160, leaderEpoch, state = Fetching))) + t1p0 -> PartitionFetchState(150, None, leaderEpoch, None, state = Fetching), + t1p1 -> PartitionFetchState(160, None, leaderEpoch, None, state = Fetching))) assertTrue(fetchRequestOpt.isDefined) val fetchRequest = fetchRequestOpt.get.fetchRequest @@ -573,8 +573,8 @@ class ReplicaAlterLogDirsThreadTest { // one partition is ready and one is truncating val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = thread.buildFetch(Map( - t1p0 -> PartitionFetchState(150, leaderEpoch, state = Fetching), - t1p1 -> PartitionFetchState(160, leaderEpoch, state = Truncating))) + t1p0 -> PartitionFetchState(150, None, leaderEpoch, state = Fetching), + t1p1 -> PartitionFetchState(160, None, leaderEpoch, state = Truncating))) assertTrue(fetchRequestOpt.isDefined) val fetchRequest = fetchRequestOpt.get @@ -587,8 +587,8 @@ class ReplicaAlterLogDirsThreadTest { // one partition is ready and one is delayed val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = thread.buildFetch(Map( - t1p0 -> PartitionFetchState(140, leaderEpoch, state = Fetching), - t1p1 -> PartitionFetchState(160, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching))) + t1p0 -> PartitionFetchState(140, None, leaderEpoch, state = Fetching), + t1p1 -> PartitionFetchState(160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching))) assertTrue(fetchRequest2Opt.isDefined) val fetchRequest2 = fetchRequest2Opt.get @@ -601,8 +601,8 @@ class ReplicaAlterLogDirsThreadTest { // both partitions are delayed val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = thread.buildFetch(Map( - t1p0 -> PartitionFetchState(140, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching), - t1p1 -> PartitionFetchState(160, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching))) + t1p0 -> PartitionFetchState(140, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching), + t1p1 -> PartitionFetchState(160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching))) assertTrue("Expected no fetch requests since all partitions are delayed", fetchRequest3Opt.isEmpty) assertFalse(partitionsWithError3.nonEmpty) } From bbb1e9994061ee60356e7f8c374379ec0e18df4c Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Tue, 1 Oct 2019 16:29:53 -0700 Subject: [PATCH 05/12] Fix logging of currentLeaderEpoch --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 97ada1d16a415..f4a9bc8da74bb 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -358,7 +358,7 @@ abstract class AbstractFetcherThread(name: String, case Errors.UNKNOWN_LEADER_EPOCH => debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " + - s"this replica's current leader epoch of ${fetchPartitionData.currentLeaderEpoch}.") + s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.") partitionsWithError += topicPartition case Errors.FENCED_LEADER_EPOCH => From 2614b2417a6aac87e5decbae0a29d4f37ba9c7e3 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Tue, 1 Oct 2019 16:39:42 -0700 Subject: [PATCH 06/12] Fetch session optimizations --- .../kafka/clients/FetchSessionHandler.java | 50 +++++--- .../kafka/server/ReplicaFetcherThread.scala | 2 +- .../fetchsession/FetchSessionBenchmark.java | 119 ++++++++++++++++++ 3 files changed, 156 insertions(+), 15 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index 0dc8943fdcbe6..d19759de7a04f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -185,7 +185,18 @@ public class Builder { * Another reason is because we make use of the list ordering to optimize the preparation of * incremental fetch requests (see below). */ - private LinkedHashMap next = new LinkedHashMap<>(); + private LinkedHashMap next; + private final boolean copySessionPartitions; + + Builder() { + this.next = new LinkedHashMap<>(); + this.copySessionPartitions = true; + } + + Builder(int initialSize, boolean copySessionPartitions) { + this.next = new LinkedHashMap<>(initialSize); + this.copySessionPartitions = copySessionPartitions; + } /** * Mark that we want data from this partition in the upcoming fetch. @@ -207,6 +218,7 @@ public FetchRequestData build() { return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata); } + LinkedHashMap updated = new LinkedHashMap<>(); List added = new ArrayList<>(); List removed = new ArrayList<>(); List altered = new ArrayList<>(); @@ -215,16 +227,13 @@ public FetchRequestData build() { Entry entry = iter.next(); TopicPartition topicPartition = entry.getKey(); PartitionData prevData = entry.getValue(); - PartitionData nextData = next.get(topicPartition); + // process from next - removing the entry now so that later only + // added partitions are left + PartitionData nextData = next.remove(topicPartition); if (nextData != null) { - if (prevData.equals(nextData)) { - // Omit this partition from the FetchRequest, because it hasn't changed - // since the previous request. - next.remove(topicPartition); - } else { - // Move the altered partition to the end of 'next' - next.remove(topicPartition); - next.put(topicPartition, nextData); + if (!prevData.equals(nextData)) { + // partition data was updated + updated.put(topicPartition, nextData); entry.setValue(nextData); altered.add(topicPartition); } @@ -247,6 +256,7 @@ public FetchRequestData build() { break; } sessionPartitions.put(topicPartition, nextData); + updated.put(topicPartition, nextData); added.add(topicPartition); } if (log.isDebugEnabled()) { @@ -255,10 +265,10 @@ public FetchRequestData build() { partitionsToLogString(altered), partitionsToLogString(removed), partitionsToLogString(sessionPartitions.keySet())); } - Map toSend = - Collections.unmodifiableMap(new LinkedHashMap<>(next)); - Map curSessionPartitions = - Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)); + Map toSend = Collections.unmodifiableMap(updated); + Map curSessionPartitions = copySessionPartitions + ? Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)) + : Collections.unmodifiableMap(sessionPartitions); next = null; return new FetchRequestData(toSend, Collections.unmodifiableList(removed), curSessionPartitions, nextMetadata); @@ -269,6 +279,18 @@ public Builder newBuilder() { return new Builder(); } + + /** A builder that allows for presizing the PartitionData hashmap, and avoiding making a + * secondary copy of the sessionPartitions, in cases where this is not necessarily. + * This builder is primarily for use by the Replica Fetcher + * @param size the initial size of the PartitionData hashmap + * @param copySessionPartitions boolean denoting whether the builder should make a deep copy of + * session partitions + */ + public Builder newBuilder(int size, boolean copySessionPartitions) { + return new Builder(size, copySessionPartitions); + } + private String partitionsToLogString(Collection partitions) { if (!log.isTraceEnabled()) { return String.format("%d partition(s)", partitions.size()); diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 310df756a07f7..5108944008a62 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -244,7 +244,7 @@ class ReplicaFetcherThread(name: String, override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { val partitionsWithError = mutable.Set[TopicPartition]() - val builder = fetchSessionHandler.newBuilder() + val builder = fetchSessionHandler.newBuilder(partitionMap.size, false) partitionMap.foreach { case (topicPartition, fetchState) => // We will not include a replica in the fetch request if it should be throttled. if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java new file mode 100644 index 0000000000000..ede9f13a4479c --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.fetchsession; + +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.utils.LogContext; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 10) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class FetchSessionBenchmark { + private static final LogContext LOG_CONTEXT = new LogContext("[BenchFetchSessionHandler]="); + + @Param(value = {"10", "100", "1000"}) + private int partitionCount; + + @Param(value = {"0", "10", "100", "1000"}) + private int updatedPercentage; + + @Param(value = {"false", "true"}) + private boolean presize; + + private LinkedHashMap fetches; + private FetchSessionHandler handler; + + @Setup(Level.Trial) + public void setUp() { + fetches = new LinkedHashMap<>(); + handler = new FetchSessionHandler(LOG_CONTEXT, 1); + FetchSessionHandler.Builder builder = handler.newBuilder(); + + LinkedHashMap> respMap = new LinkedHashMap<>(); + for (int i = 0; i < partitionCount; i++) { + TopicPartition tp = new TopicPartition("foo", i); + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(0, 0, 200, + Optional.empty()); + fetches.put(tp, partitionData); + builder.add(tp, partitionData); + respMap.put(tp, new FetchResponse.PartitionData<>( + Errors.NONE, + 0L, + 0L, + 0, + null, + null)); + } + FetchSessionHandler.FetchRequestData data = builder.build(); + // build and handle an initial response so that the next fetch will be incremental + handler.handleResponse(new FetchResponse<>(Errors.NONE, respMap, 0, 1)); + + int counter = 0; + for (TopicPartition topicPartition: new ArrayList<>(fetches.keySet())) { + if (updatedPercentage != 0 && counter % (100 / updatedPercentage) == 0) { + // reorder in fetch session, and update log start offset + fetches.remove(topicPartition); + fetches.put(topicPartition, new FetchRequest.PartitionData(50, 40, 200, + Optional.empty())); + } + counter++; + } + } + + @Benchmark + @OutputTimeUnit(TimeUnit.NANOSECONDS) + public void incrementalFetchSessionBuild() { + FetchSessionHandler.Builder builder; + if (presize) + builder = handler.newBuilder(fetches.size(), true); + else + builder = handler.newBuilder(); + + for (Map.Entry entry: fetches.entrySet()) { + builder.add(entry.getKey(), entry.getValue()); + } + + builder.build(); + } +} From f0bf23acd8198eae0faed67c2414301eccf04dc6 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Fri, 11 Oct 2019 10:21:10 -0700 Subject: [PATCH 07/12] Avoid unecessary sessionPartitions check. New strategy ensures that only new partitions are left in next map. --- .../org/apache/kafka/clients/FetchSessionHandler.java | 9 +-------- .../apache/kafka/clients/FetchSessionHandlerTest.java | 4 ++-- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index d19759de7a04f..a2cac592c13f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -244,17 +244,10 @@ public FetchRequestData build() { removed.add(topicPartition); } } - // Add any new partitions to the session. + // Only the new partitions are left in next. Add these new partitions to the session for (Entry entry : next.entrySet()) { TopicPartition topicPartition = entry.getKey(); PartitionData nextData = entry.getValue(); - if (sessionPartitions.containsKey(topicPartition)) { - // In the previous loop, all the partitions which existed in both sessionPartitions - // and next were moved to the end of next, or removed from next. Therefore, - // once we hit one of them, we know there are no more unseen entries to look - // at in next. - break; - } sessionPartitions.put(topicPartition, nextData); updated.put(topicPartition, nextData); added.add(topicPartition); diff --git a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java index ec1b0624169af..9e797ccfe5b75 100644 --- a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java @@ -248,8 +248,8 @@ public void testIncrementals() { new ReqEntry("foo", 1, 10, 120, 210), new ReqEntry("bar", 0, 20, 200, 200)), data2.sessionPartitions()); - assertMapEquals(reqMap(new ReqEntry("bar", 0, 20, 200, 200), - new ReqEntry("foo", 1, 10, 120, 210)), + assertMapEquals(reqMap(new ReqEntry("foo", 1, 10, 120, 210), + new ReqEntry("bar", 0, 20, 200, 200)), data2.toSend()); FetchResponse resp2 = new FetchResponse<>(Errors.NONE, From 32e30ace5865e0a5b769cad626f64bcdf5091faf Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Fri, 11 Oct 2019 16:35:59 -0700 Subject: [PATCH 08/12] Minimize differences in fetch session handler vs trunk We will get better performance improvements by redesigning how incremental fetch sessions work in the Replica Fetcher. Minimizing differences in the fetch session will minimize risks of the changeset. --- checkstyle/import-control-jmh-benchmarks.xml | 1 + .../kafka/clients/FetchSessionHandler.java | 19 +++++++++++-------- .../clients/FetchSessionHandlerTest.java | 4 ++-- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index d19a9b6f60f9f..76475ec386c97 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -38,6 +38,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index a2cac592c13f3..c552f7bd37061 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -218,7 +218,6 @@ public FetchRequestData build() { return new FetchRequestData(toSend, Collections.emptyList(), toSend, nextMetadata); } - LinkedHashMap updated = new LinkedHashMap<>(); List added = new ArrayList<>(); List removed = new ArrayList<>(); List altered = new ArrayList<>(); @@ -227,13 +226,11 @@ public FetchRequestData build() { Entry entry = iter.next(); TopicPartition topicPartition = entry.getKey(); PartitionData prevData = entry.getValue(); - // process from next - removing the entry now so that later only - // added partitions are left PartitionData nextData = next.remove(topicPartition); if (nextData != null) { if (!prevData.equals(nextData)) { - // partition data was updated - updated.put(topicPartition, nextData); + // Re-add the altered partition to the end of 'next' + next.put(topicPartition, nextData); entry.setValue(nextData); altered.add(topicPartition); } @@ -244,12 +241,18 @@ public FetchRequestData build() { removed.add(topicPartition); } } - // Only the new partitions are left in next. Add these new partitions to the session + // Add any new partitions to the session. for (Entry entry : next.entrySet()) { TopicPartition topicPartition = entry.getKey(); PartitionData nextData = entry.getValue(); + if (sessionPartitions.containsKey(topicPartition)) { + // In the previous loop, all the partitions which existed in both sessionPartitions + // and next were moved to the end of next, or removed from next. Therefore, + // once we hit one of them, we know there are no more unseen entries to look + // at in next. + break; + } sessionPartitions.put(topicPartition, nextData); - updated.put(topicPartition, nextData); added.add(topicPartition); } if (log.isDebugEnabled()) { @@ -258,7 +261,7 @@ public FetchRequestData build() { partitionsToLogString(altered), partitionsToLogString(removed), partitionsToLogString(sessionPartitions.keySet())); } - Map toSend = Collections.unmodifiableMap(updated); + Map toSend = Collections.unmodifiableMap(next); Map curSessionPartitions = copySessionPartitions ? Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions)) : Collections.unmodifiableMap(sessionPartitions); diff --git a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java index 9e797ccfe5b75..ec1b0624169af 100644 --- a/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java @@ -248,8 +248,8 @@ public void testIncrementals() { new ReqEntry("foo", 1, 10, 120, 210), new ReqEntry("bar", 0, 20, 200, 200)), data2.sessionPartitions()); - assertMapEquals(reqMap(new ReqEntry("foo", 1, 10, 120, 210), - new ReqEntry("bar", 0, 20, 200, 200)), + assertMapEquals(reqMap(new ReqEntry("bar", 0, 20, 200, 200), + new ReqEntry("foo", 1, 10, 120, 210)), data2.toSend()); FetchResponse resp2 = new FetchResponse<>(Errors.NONE, From 40728b38698652c66c1480ab07176c72e7ef7e23 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Fri, 11 Oct 2019 18:09:17 -0700 Subject: [PATCH 09/12] Spotbugs fixes --- gradle/spotbugs-exclude.xml | 2 ++ .../apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 1 - .../apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 07ca2b450e753..51d0e37671d4a 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -157,6 +157,8 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + + diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index b82785f746f31..44ef9a28eb4f1 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -80,7 +80,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java index ede9f13a4479c..9fa25139909b7 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java @@ -85,7 +85,7 @@ public void setUp() { null, null)); } - FetchSessionHandler.FetchRequestData data = builder.build(); + builder.build(); // build and handle an initial response so that the next fetch will be incremental handler.handleResponse(new FetchResponse<>(Errors.NONE, respMap, 0, 1)); From 1a0ed01d9d90b4b3310cd76944f644c840e70217 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Tue, 15 Oct 2019 10:06:00 -0700 Subject: [PATCH 10/12] Fix compile error on scala 2.13 by making fetchFromLeader return a map --- .../main/scala/kafka/server/AbstractFetcherThread.scala | 4 ++-- .../scala/kafka/server/ReplicaAlterLogDirsThread.scala | 4 ++-- .../src/main/scala/kafka/server/ReplicaFetcherThread.scala | 6 +++--- .../unit/kafka/server/AbstractFetcherThreadTest.scala | 6 +++--- .../kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 7 +++---- 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index f4a9bc8da74bb..f24962a5960b6 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -93,7 +93,7 @@ abstract class AbstractFetcherThread(name: String, protected def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] - protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] + protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long @@ -281,7 +281,7 @@ abstract class AbstractFetcherThread(name: String, private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder): Unit = { val partitionsWithError = mutable.Set[TopicPartition]() - var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty + var responseData: Map[TopicPartition, FetchData] = Map.empty try { trace(s"Sending fetch request $fetchRequest") diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 3b47b79626f08..c36de3d0b1db5 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -72,7 +72,7 @@ class ReplicaAlterLogDirsThread(name: String, replicaMgr.futureLocalLogOrException(topicPartition).endOffsetForEpoch(epoch) } - def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = { + def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData[Records])] = null val request = fetchRequest.build() @@ -100,7 +100,7 @@ class ReplicaAlterLogDirsThread(name: String, if (partitionData == null) throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}") - partitionData + partitionData.toMap } // process fetched data diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 5108944008a62..83a5dea238529 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -196,14 +196,14 @@ class ReplicaFetcherThread(name: String, } - override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = { + override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { try { val clientResponse = leaderEndpoint.sendRequest(fetchRequest) val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]] if (!fetchSessionHandler.handleResponse(fetchResponse)) { - Nil + Map.empty } else { - fetchResponse.responseData.asScala.toSeq + fetchResponse.responseData.asScala } } catch { case t: Throwable => diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index af280ab4d45e3..17075219614dd 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -576,7 +576,7 @@ class AbstractFetcherThreadTest { val fetcher = new MockFetcherThread { var fetchedOnce = false - override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = { + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { val fetchedData = super.fetchFromLeader(fetchRequest) if (!fetchedOnce) { val records = fetchedData.head._2.records.asInstanceOf[MemoryRecords] @@ -976,7 +976,7 @@ class AbstractFetcherThreadTest { override protected def isOffsetForLeaderEpochSupported: Boolean = true - override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = { + override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = { fetchRequest.fetchData.asScala.map { case (partition, fetchData) => val leaderState = leaderPartitionState(partition) val epochCheckError = checkExpectedLeaderEpoch(fetchData.currentLeaderEpoch, leaderState) @@ -1003,7 +1003,7 @@ class AbstractFetcherThreadTest { (partition, new FetchData(error, leaderState.highWatermark, leaderState.highWatermark, leaderState.logStartOffset, List.empty.asJava, records)) - }.toSeq + }.toMap } private def checkLeaderEpochAndThrow(expectedEpoch: Int, partitionState: PartitionState): Unit = { diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 44ef9a28eb4f1..0635ff5338764 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -68,18 +68,17 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; import scala.Option; -import scala.Tuple2; import scala.collection.Iterator; import scala.collection.JavaConverters; import scala.compat.java8.OptionConverters; import scala.collection.Map; -import scala.collection.Seq; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -306,8 +305,8 @@ public Map fetchEpochEndOffsets(Map>> fetchFromLeader(FetchRequest.Builder fetchRequest) { - return JavaConverters.asScalaIteratorConverter(new ArrayList>>().iterator()).asScala().toSeq(); + public Map> fetchFromLeader(FetchRequest.Builder fetchRequest) { + return new scala.collection.mutable.HashMap<>(); } } } From 4a306ad7967e20712ec2c0ea085a91b5f471b9a3 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Tue, 15 Oct 2019 10:06:23 -0700 Subject: [PATCH 11/12] Pre-compute or pass through lags through fetch states where possible --- .../kafka/server/AbstractFetcherThread.scala | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index f24962a5960b6..17b607335e7be 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -390,7 +390,7 @@ abstract class AbstractFetcherThread(name: String, try { Option(partitionStates.stateValue(topicPartition)).foreach { state => val newState = PartitionFetchState(math.min(truncationOffset, state.fetchOffset), - None, state.currentLeaderEpoch, state.delay, state = Truncating) + state.lag, state.currentLeaderEpoch, state.delay, state = Truncating) partitionStates.updateAndMoveToEnd(topicPartition, newState) partitionMapCond.signalAll() } @@ -415,12 +415,10 @@ abstract class AbstractFetcherThread(name: String, val currentState = partitionStates.stateValue(tp) val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) { currentState + } else if (initialFetchState.offset < 0) { + fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch) } else { - val initialFetchOffset = if (initialFetchState.offset < 0) - fetchOffsetAndTruncate(tp, initialFetchState.leaderEpoch) - else - initialFetchState.offset - PartitionFetchState(initialFetchOffset, None, initialFetchState.leaderEpoch, state = Truncating) + PartitionFetchState(initialFetchState.offset, None, initialFetchState.leaderEpoch, state = Truncating) } partitionStates.updateAndMoveToEnd(tp, updatedState) } @@ -442,7 +440,7 @@ abstract class AbstractFetcherThread(name: String, val maybeTruncationComplete = fetchOffsets.get(state.topicPartition) match { case Some(offsetTruncationState) => val state = if (offsetTruncationState.truncationCompleted) Fetching else Truncating - PartitionFetchState(offsetTruncationState.offset, None, + PartitionFetchState(offsetTruncationState.offset, currentFetchState.lag, currentFetchState.currentLeaderEpoch, currentFetchState.delay, state) case None => currentFetchState } @@ -530,11 +528,10 @@ abstract class AbstractFetcherThread(name: String, private def handleOutOfRangeError(topicPartition: TopicPartition, fetchState: PartitionFetchState): Boolean = { try { - val newOffset = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch) - val newFetchState = PartitionFetchState(newOffset, None, fetchState.currentLeaderEpoch, state = Fetching) + val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.currentLeaderEpoch) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " + - s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset") + s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}") true } catch { case _: FencedLeaderEpochException => @@ -556,7 +553,7 @@ abstract class AbstractFetcherThread(name: String, /** * Handle a partition whose offset is out of range and return a new fetch offset. */ - protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = { + protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, currentLeaderEpoch: Int): PartitionFetchState = { val replicaEndOffset = logEndOffset(topicPartition) /** @@ -574,7 +571,9 @@ abstract class AbstractFetcherThread(name: String, warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + s"leader's latest offset $leaderEndOffset") truncate(topicPartition, OffsetTruncationState(leaderEndOffset, truncationCompleted = true)) - leaderEndOffset + + fetcherLagStats.getAndMaybePut(topicPartition).lag = 0 + PartitionFetchState(leaderEndOffset, Some(0), currentLeaderEpoch, state = Fetching) } else { /** * If the leader's log end offset is greater than the follower's log end offset, there are two possibilities: @@ -604,7 +603,10 @@ abstract class AbstractFetcherThread(name: String, // Only truncate log when current leader's log start offset is greater than follower's log end offset. if (leaderStartOffset > replicaEndOffset) truncateFullyAndStartAt(topicPartition, leaderStartOffset) - offsetToFetch + + val initialLag = leaderEndOffset - offsetToFetch + fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag + PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, state = Fetching) } } @@ -615,7 +617,7 @@ abstract class AbstractFetcherThread(name: String, Option(partitionStates.stateValue(partition)).foreach { currentFetchState => if (!currentFetchState.isDelayed) { partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.fetchOffset, - None, currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state)) + currentFetchState.lag, currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)), currentFetchState.state)) } } } From 1aa07f40ac1c69349f51604ba85e44344e1644d2 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Tue, 15 Oct 2019 15:26:07 -0700 Subject: [PATCH 12/12] Output lag in PartitionFetchState toString --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 17b607335e7be..5c65bed128ff6 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -782,6 +782,7 @@ case class PartitionFetchState(fetchOffset: Long, s"FetchState(fetchOffset=$fetchOffset" + s", currentLeaderEpoch=$currentLeaderEpoch" + s", state=$state" + + s", lag=$lag" + s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" + s")" }