From 446717166dead4da44171e6fb3eafe34cdddfbd6 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 22 Feb 2019 08:52:24 -0800 Subject: [PATCH 1/5] MINOR: Refactor replica log dir fetching for improved logging --- .../server/ReplicaAlterLogDirsThread.scala | 69 ++++++++++++++----- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 54bb2a2248740..77a22d3593acd 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -51,6 +51,7 @@ class ReplicaAlterLogDirsThread(name: String, private val replicaId = brokerConfig.brokerId private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes private val fetchSize = brokerConfig.replicaFetchMaxBytes + private var inProgressPartition: Option[TopicPartition] = None override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = { replicaMgr.futureLocalReplicaOrException(topicPartition).latestEpoch @@ -185,32 +186,51 @@ class ReplicaAlterLogDirsThread(name: String, partition.truncateFullyAndStartAt(offset, isFuture = true) } - def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = { - // Only include replica in the fetch request if it is not throttled. - val maxPartitionOpt = partitionMap.filter { case (_, partitionFetchState) => - partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded + private def nextReadyPartition(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = { + partitionMap.filter { case (_, partitionFetchState) => + partitionFetchState.isReadyForFetch }.reduceLeftOption { (left, right) => - if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() && left._1.partition() >= right._1.partition())) + if ((left._1.topic < right._1.topic) || (left._1.topic == right._1.topic && left._1.partition < right._1.partition)) left else right } + } + + private def selectPartitionToFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = { + // Only move one partition at a time to increase its catch-up rate and thus reduce the time spent on + // moving any given replica. Replicas are selected in ascending order (lexicographically for topics) from the + // partition that are ready to fetch. Once selected, we will continue fetching the same partition until it + // becomes unavailable. + + inProgressPartition.foreach { tp => + val fetchStateOpt = partitionMap.get(tp) + fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState => + return Some((tp, fetchState)) + } + } + + inProgressPartition = None + + val nextPartitionOpt = nextReadyPartition(partitionMap) + nextPartitionOpt.foreach { case (tp, fetchState) => + inProgressPartition = Some(tp) + info(s"Beginning or resuming log dir move of partition $tp beginning from offset ${fetchState.fetchOffset}") + } + nextPartitionOpt + } - // Only move one replica at a time to increase its catch-up rate and thus reduce the time spent on moving any given replica - // Replicas are ordered by their TopicPartition + private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = { val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] val partitionsWithError = mutable.Set[TopicPartition]() - if (maxPartitionOpt.nonEmpty) { - val (topicPartition, partitionFetchState) = maxPartitionOpt.get - try { - val logStartOffset = replicaMgr.futureLocalReplicaOrException(topicPartition).logStartOffset - requestMap.put(topicPartition, new FetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, - fetchSize, Optional.of(partitionFetchState.currentLeaderEpoch))) - } catch { - case _: KafkaStorageException => - partitionsWithError += topicPartition - } + try { + val logStartOffset = replicaMgr.futureLocalReplicaOrException(tp).logStartOffset + requestMap.put(tp, new FetchRequest.PartitionData(fetchState.fetchOffset, logStartOffset, + fetchSize, Optional.of(fetchState.currentLeaderEpoch))) + } catch { + case _: KafkaStorageException => + partitionsWithError += tp } val fetchRequestOpt = if (requestMap.isEmpty) { @@ -221,7 +241,22 @@ class ReplicaAlterLogDirsThread(name: String, Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap) .setMaxBytes(maxBytes)) } + ResultWithPartitions(fetchRequestOpt, partitionsWithError) } + def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = { + // Only include replica in the fetch request if it is not throttled. + if (quota.isQuotaExceeded) { + ResultWithPartitions(None, Set.empty) + } else { + selectPartitionToFetch(partitionMap) match { + case Some((tp, fetchState)) => + buildFetchForPartition(tp, fetchState) + case None => + ResultWithPartitions(None, Set.empty) + } + } + } + } From 2f4003eaba9db347be4d9bb50d012d44dca6180c Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 22 Feb 2019 15:54:02 -0800 Subject: [PATCH 2/5] Alter test case for ascending order of handling --- .../unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index d21e1e1c16c35..3cc8c9bb904b0 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -526,8 +526,8 @@ class ReplicaAlterLogDirsThreadTest { assertEquals(0, request.minBytes) val fetchInfos = request.fetchData.asScala.toSeq assertEquals(1, fetchInfos.length) - assertEquals("Expected fetch request for largest partition", t1p1, fetchInfos.head._1) - assertEquals(160, fetchInfos.head._2.fetchOffset) + assertEquals("Expected fetch request for largest partition", t1p0, fetchInfos.head._1) + assertEquals(150, fetchInfos.head._2.fetchOffset) } @Test From 5c5c808f75c7c116b01d11a2f43bfe8ca5f3d9ea Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 22 Feb 2019 16:01:53 -0800 Subject: [PATCH 3/5] Also print how many partitions are remaining --- .../main/scala/kafka/server/ReplicaAlterLogDirsThread.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 77a22d3593acd..fdd046e4a6f9d 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -215,7 +215,8 @@ class ReplicaAlterLogDirsThread(name: String, val nextPartitionOpt = nextReadyPartition(partitionMap) nextPartitionOpt.foreach { case (tp, fetchState) => inProgressPartition = Some(tp) - info(s"Beginning or resuming log dir move of partition $tp beginning from offset ${fetchState.fetchOffset}") + info(s"Beginning/resuming copy of partition $tp from offset ${fetchState.fetchOffset}. " + + s"Including this partition, there are ${partitionMap.size} remaining partitions to copy by this thread.") } nextPartitionOpt } From 22fa9a379d7975fde49158379008726a42d7359d Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 22 Feb 2019 18:04:59 -0800 Subject: [PATCH 4/5] Minor grammar fix --- .../main/scala/kafka/server/ReplicaAlterLogDirsThread.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index fdd046e4a6f9d..07fd684cefe7e 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -199,8 +199,8 @@ class ReplicaAlterLogDirsThread(name: String, private def selectPartitionToFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = { // Only move one partition at a time to increase its catch-up rate and thus reduce the time spent on - // moving any given replica. Replicas are selected in ascending order (lexicographically for topics) from the - // partition that are ready to fetch. Once selected, we will continue fetching the same partition until it + // moving any given replica. Replicas are selected in ascending order (lexicographically by topic) from the + // partitions that are ready to fetch. Once selected, we will continue fetching the same partition until it // becomes unavailable. inProgressPartition.foreach { tp => From f85e26bb630c57c626e26204051c74dcc68b4a01 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 25 Feb 2019 18:36:43 -0800 Subject: [PATCH 5/5] A couple doc fixes per review comments --- .../src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala | 2 +- .../scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 07fd684cefe7e..8df234e65a317 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -201,7 +201,7 @@ class ReplicaAlterLogDirsThread(name: String, // Only move one partition at a time to increase its catch-up rate and thus reduce the time spent on // moving any given replica. Replicas are selected in ascending order (lexicographically by topic) from the // partitions that are ready to fetch. Once selected, we will continue fetching the same partition until it - // becomes unavailable. + // becomes unavailable or is removed. inProgressPartition.foreach { tp => val fetchStateOpt = partitionMap.get(tp) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala index 3cc8c9bb904b0..779c0e53c54bf 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala @@ -526,7 +526,7 @@ class ReplicaAlterLogDirsThreadTest { assertEquals(0, request.minBytes) val fetchInfos = request.fetchData.asScala.toSeq assertEquals(1, fetchInfos.length) - assertEquals("Expected fetch request for largest partition", t1p0, fetchInfos.head._1) + assertEquals("Expected fetch request for first partition", t1p0, fetchInfos.head._1) assertEquals(150, fetchInfos.head._2.fetchOffset) }