KAFKA-2960: Clear purgatory for partitions before becoming follower#1018
KAFKA-2960: Clear purgatory for partitions before becoming follower#1018becketqin wants to merge 7 commits into
Conversation
| } | ||
|
|
||
| logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap) | ||
| partitionsToMakeFollower.foreach(partition => { |
There was a problem hiding this comment.
Can you add a detailed description of the fix somewhere? It makes sense intuitively, but it would be good it you explained the impact of this.
There was a problem hiding this comment.
Nitpick: the following syntax is a bit cleaner:
partitionsToMakeFollower.foreach { partition =>
...
}There was a problem hiding this comment.
Related to @auradkar's point, do we need to update the comment in makeFollowers?
|
This seems OK to me, but it would be good to get input from someone who knows this code better. |
|
@ijuma Thanks for the review. @junrao @guozhangwang Would you be able to take a look? |
| /** | ||
| * Test the case where produce requests in the purgatory should be cleared when the leader migrates to another broker. | ||
| */ | ||
| class ClearPurgatoryOnLeaderMovementTest extends KafkaServerTestHarness { |
There was a problem hiding this comment.
I haven't checked carefully enough if this is possible, but could you just mock out the ReplicaManager and avoid the full test harness? and if so, can you fold this into the ReplicaManagerTest
|
+1 on the change except a minor comment on the test. |
| * limitations under the License. | ||
| */ | ||
|
|
||
| package integration.kafka.api |
There was a problem hiding this comment.
The package name should be kafka.api only
| logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap) | ||
| partitionsToMakeFollower.foreach { partition => | ||
| val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topic, partition.partitionId) | ||
| tryCompleteDelayedProduce(topicPartitionOperationKey) |
There was a problem hiding this comment.
In DelayedProduce, we don't recheck if the leader has changed or now. Is tryCompleteDelayedProduce() enough to force the produce request to be completed? We probably need to add a forceComplete() in delayedProducePurgatory?
There was a problem hiding this comment.
I was wondering about this as well, but I think we do this right? i.e., it makes a call to Partition.checkEnoughReplicasReachOffset which returns (false, Errors.NOT_LEADER_FOR_PARTITION.code) if the leader is non-local
There was a problem hiding this comment.
Yes, that's right. Then the patch LGTM.
| @@ -49,7 +50,7 @@ class ReplicaManagerTest { | |||
| val zkClient = EasyMock.createMock(classOf[ZkClient]) | |||
| val zkUtils = ZkUtils(zkClient, false) | |||
There was a problem hiding this comment.
Can you make this also use a named argument?
|
Test looks much better now. Can you also test that delayed fetch requests are cleared? |
| // Make this replica the follower | ||
| val leaderAndIsrRequest2 = new LeaderAndIsrRequest(0, 0, | ||
| collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, Seq(0, 1).asJava.asInstanceOf[java.util.List[Integer]], | ||
| 0, Set(0, 1).asJava.asInstanceOf[java.util.Set[Integer]])).asJava, |
There was a problem hiding this comment.
Sorry to be a pain, but we don't need the asInstanceOf. See the following:
scala> import scala.collection.JavaConverters._
import scala.collection.JavaConverters._
scala> val list: java.util.List[Integer] = Seq(0, 1)
<console>:13: error: type mismatch;
found : Seq[Int]
required: java.util.List[Integer]
val list: java.util.List[Integer] = Seq(0, 1)
^
scala> val list: java.util.List[Integer] = Seq(0, 1).asJava
<console>:13: error: type mismatch;
found : java.util.List[Int]
required: java.util.List[Integer]
val list: java.util.List[Integer] = Seq(0, 1).asJava
^
scala> val list: java.util.List[Integer] = Seq[Integer](0, 1).asJava
list: java.util.List[Integer] = [0, 1]There was a problem hiding this comment.
This is to show that if you pass the right type parameter when constructing the Seq, then auto-boxing happens during construction and a lot of ugliness is avoided.
|
Thanks, the patch LGTM. |
No description provided.