From 80cebecc431d171ec6281c921860a1d521b9cb89 Mon Sep 17 00:00:00 2001 From: Dhruvil Shah Date: Fri, 25 Jan 2019 15:12:35 -0800 Subject: [PATCH] Ensure offline partitions are picked up as soon as possible when shrinking ISR --- core/src/main/scala/kafka/server/ReplicaManager.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5e41e35f87981..955701a93e37a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -411,9 +411,13 @@ class ReplicaManager(val config: KafkaConfig, def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition) + // An iterator over all non offline partitions. This is a weakly consistent iterator; a partition made offline after + // the iterator has been constructed could still be returned by this iterator. private def nonOfflinePartitionsIterator: Iterator[Partition] = allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition) + // An iterator over all offline partitions. This is a weakly consistent iterator; a partition made offline after the + // iterator has been constructed may not be visible. private def offlinePartitionsIterator: Iterator[Partition] = allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition) @@ -1356,7 +1360,11 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") - nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) + + // Shrink ISRs for non offline partitions + allPartitions.keys.foreach { topicPartition => + nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) + } } /**