Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 53 additions & 17 deletions core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class ReplicaAlterLogDirsThread(name: String,
private val replicaId = brokerConfig.brokerId
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
private var inProgressPartition: Option[TopicPartition] = None

override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
replicaMgr.futureLocalReplicaOrException(topicPartition).latestEpoch
Expand Down Expand Up @@ -185,32 +186,52 @@ class ReplicaAlterLogDirsThread(name: String,
partition.truncateFullyAndStartAt(offset, isFuture = true)
}

def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
// Only include replica in the fetch request if it is not throttled.
val maxPartitionOpt = partitionMap.filter { case (_, partitionFetchState) =>
partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded
private def nextReadyPartition(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
partitionMap.filter { case (_, partitionFetchState) =>
partitionFetchState.isReadyForFetch
}.reduceLeftOption { (left, right) =>
if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() && left._1.partition() >= right._1.partition()))
if ((left._1.topic < right._1.topic) || (left._1.topic == right._1.topic && left._1.partition < right._1.partition))
left
else
right
}
}

private def selectPartitionToFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): Option[(TopicPartition, PartitionFetchState)] = {
// Only move one partition at a time to increase its catch-up rate and thus reduce the time spent on
// moving any given replica. Replicas are selected in ascending order (lexicographically by topic) from the
// partitions that are ready to fetch. Once selected, we will continue fetching the same partition until it
// becomes unavailable or is removed.

inProgressPartition.foreach { tp =>
val fetchStateOpt = partitionMap.get(tp)
fetchStateOpt.filter(_.isReadyForFetch).foreach { fetchState =>
return Some((tp, fetchState))
}
}

inProgressPartition = None

val nextPartitionOpt = nextReadyPartition(partitionMap)
nextPartitionOpt.foreach { case (tp, fetchState) =>
inProgressPartition = Some(tp)
info(s"Beginning/resuming copy of partition $tp from offset ${fetchState.fetchOffset}. " +
s"Including this partition, there are ${partitionMap.size} remaining partitions to copy by this thread.")
}
nextPartitionOpt
}

// Only move one replica at a time to increase its catch-up rate and thus reduce the time spent on moving any given replica
// Replicas are ordered by their TopicPartition
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = {
val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
val partitionsWithError = mutable.Set[TopicPartition]()

if (maxPartitionOpt.nonEmpty) {
val (topicPartition, partitionFetchState) = maxPartitionOpt.get
try {
val logStartOffset = replicaMgr.futureLocalReplicaOrException(topicPartition).logStartOffset
requestMap.put(topicPartition, new FetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset,
fetchSize, Optional.of(partitionFetchState.currentLeaderEpoch)))
} catch {
case _: KafkaStorageException =>
partitionsWithError += topicPartition
}
try {
val logStartOffset = replicaMgr.futureLocalReplicaOrException(tp).logStartOffset
requestMap.put(tp, new FetchRequest.PartitionData(fetchState.fetchOffset, logStartOffset,
fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
} catch {
case _: KafkaStorageException =>
partitionsWithError += tp
}

val fetchRequestOpt = if (requestMap.isEmpty) {
Expand All @@ -221,7 +242,22 @@ class ReplicaAlterLogDirsThread(name: String,
Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap)
.setMaxBytes(maxBytes))
}

ResultWithPartitions(fetchRequestOpt, partitionsWithError)
}

def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
// Only include replica in the fetch request if it is not throttled.
if (quota.isQuotaExceeded) {
ResultWithPartitions(None, Set.empty)
} else {
selectPartitionToFetch(partitionMap) match {
case Some((tp, fetchState)) =>
buildFetchForPartition(tp, fetchState)
case None =>
ResultWithPartitions(None, Set.empty)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,8 @@ class ReplicaAlterLogDirsThreadTest {
assertEquals(0, request.minBytes)
val fetchInfos = request.fetchData.asScala.toSeq
assertEquals(1, fetchInfos.length)
assertEquals("Expected fetch request for largest partition", t1p1, fetchInfos.head._1)
assertEquals(160, fetchInfos.head._2.fetchOffset)
assertEquals("Expected fetch request for first partition", t1p0, fetchInfos.head._1)
assertEquals(150, fetchInfos.head._2.fetchOffset)
}

@Test
Expand Down