diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala index 7a4d0ba14ffa0..5b90e248be57c 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala @@ -50,7 +50,7 @@ class ControllerChannelManagerTest { @Test def testLeaderAndIsrRequestSent(): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val batch = new MockControllerBrokerRequestBatch(context) val partitions = Map( @@ -98,7 +98,7 @@ class ControllerChannelManagerTest { @Test def testLeaderAndIsrRequestIsNew(): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val batch = new MockControllerBrokerRequestBatch(context) val partition = new TopicPartition("foo", 0) @@ -126,7 +126,7 @@ class ControllerChannelManagerTest { @Test def testLeaderAndIsrRequestSentToLiveOrShuttingDownBrokers(): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val batch = new MockControllerBrokerRequestBatch(context) // 2 is shutting down, 3 is dead @@ -176,7 +176,7 @@ class ControllerChannelManagerTest { private def testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion, expectedLeaderAndIsrVersion: Short): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val config = createConfig(interBrokerProtocolVersion) val batch = new MockControllerBrokerRequestBatch(context, config) @@ -215,7 +215,7 @@ class ControllerChannelManagerTest { def testUpdateMetadataRequestSent(): Unit = { val topicIds = Map("foo" -> Uuid.randomUuid(), "bar" -> Uuid.randomUuid()) - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, topicIds) + val context = initContext(Seq(1, 2, 3), 2, 3, topicIds) val batch = new MockControllerBrokerRequestBatch(context) val partitions = Map( @@ -264,7 +264,7 @@ class ControllerChannelManagerTest { @Test def testUpdateMetadataDoesNotIncludePartitionsWithoutLeaderAndIsr(): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val batch = new MockControllerBrokerRequestBatch(context) val partitions = Set( @@ -292,7 +292,7 @@ class ControllerChannelManagerTest { @Test def testUpdateMetadataRequestDuringTopicDeletion(): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val batch = new MockControllerBrokerRequestBatch(context) val partitions = Map( @@ -334,7 +334,7 @@ class ControllerChannelManagerTest { @Test def testUpdateMetadataIncludesLiveOrShuttingDownBrokers(): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val batch = new MockControllerBrokerRequestBatch(context) // 2 is shutting down, 3 is dead @@ -379,7 +379,7 @@ class ControllerChannelManagerTest { private def testUpdateMetadataFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion, expectedUpdateMetadataVersion: Short): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val config = createConfig(interBrokerProtocolVersion) val batch = new MockControllerBrokerRequestBatch(context, config) @@ -400,7 +400,7 @@ class ControllerChannelManagerTest { @Test def testStopReplicaRequestSent(): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val batch = new MockControllerBrokerRequestBatch(context) val partitions = Map( @@ -435,7 +435,7 @@ class ControllerChannelManagerTest { @Test def testStopReplicaRequestWithAlreadyDefinedDeletedPartition(): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val batch = new MockControllerBrokerRequestBatch(context) val partition = new TopicPartition("foo", 0) @@ -463,7 +463,7 @@ class ControllerChannelManagerTest { } private def testStopReplicaRequestsWhileTopicQueuedForDeletion(interBrokerProtocolVersion: ApiVersion): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val config = createConfig(interBrokerProtocolVersion) val batch = new MockControllerBrokerRequestBatch(context, config) @@ -510,7 +510,7 @@ class ControllerChannelManagerTest { } private def testStopReplicaRequestsWhileTopicDeletionStarted(interBrokerProtocolVersion: ApiVersion): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val config = createConfig(interBrokerProtocolVersion) val batch = new MockControllerBrokerRequestBatch(context, config) @@ -565,7 +565,7 @@ class ControllerChannelManagerTest { } private def testStopReplicaRequestWithoutDeletePartitionWhileTopicDeletionStarted(interBrokerProtocolVersion: ApiVersion): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val config = createConfig(interBrokerProtocolVersion) val batch = new MockControllerBrokerRequestBatch(context, config) @@ -623,7 +623,7 @@ class ControllerChannelManagerTest { private def testMixedDeleteAndNotDeleteStopReplicaRequests(interBrokerProtocolVersion: ApiVersion, expectedStopReplicaRequestVersion: Short): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val config = createConfig(interBrokerProtocolVersion) val batch = new MockControllerBrokerRequestBatch(context, config) @@ -682,7 +682,7 @@ class ControllerChannelManagerTest { @Test def testStopReplicaGroupsByBroker(): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val batch = new MockControllerBrokerRequestBatch(context) val partitions = Map( @@ -720,7 +720,7 @@ class ControllerChannelManagerTest { @Test def testStopReplicaSentOnlyToLiveAndShuttingDownBrokers(): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo", "bar")) val batch = new MockControllerBrokerRequestBatch(context) // 2 is shutting down, 3 is dead @@ -772,7 +772,7 @@ class ControllerChannelManagerTest { private def testStopReplicaFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion, expectedStopReplicaRequestVersion: Short): Unit = { - val context = initContext(Seq(1, 2, 3), Set("foo"), 2, 3, Map.empty) + val context = initContext(Seq(1, 2, 3), 2, 3, Set("foo")) val config = createConfig(interBrokerProtocolVersion) val batch = new MockControllerBrokerRequestBatch(context, config) @@ -884,7 +884,12 @@ class ControllerChannelManagerTest { private def replicaAssignment(replicas: Seq[Int]): ReplicaAssignment = ReplicaAssignment(replicas, Seq(), Seq()) private def initContext(brokers: Seq[Int], - topics: Set[String], + numPartitions: Int, + replicationFactor: Int, + topics: Set[String]): ControllerContext = initContext(brokers, numPartitions, + replicationFactor, topics.map(_ -> Uuid.randomUuid()).toMap) + + private def initContext(brokers: Seq[Int], numPartitions: Int, replicationFactor: Int, topicIds: Map[String, Uuid]): ControllerContext = { @@ -896,15 +901,12 @@ class ControllerChannelManagerTest { }.toMap context.setLiveBrokers(brokerEpochs) - context.setAllTopics(topics) - - for (topic <- topics) { - context.addTopicId(topic, Uuid.randomUuid()) - } + context.setAllTopics(topicIds.keySet) + topicIds.foreach { case (name, id) => context.addTopicId(name, id) } // Simple round-robin replica assignment var leaderIndex = 0 - for (topic <- topics; partitionId <- 0 until numPartitions) { + for (topic <- topicIds.keys; partitionId <- 0 until numPartitions) { val partition = new TopicPartition(topic, partitionId) val replicas = (0 until replicationFactor).map { i => val replica = brokers((i + leaderIndex) % brokers.size) @@ -914,8 +916,6 @@ class ControllerChannelManagerTest { leaderIndex += 1 } - context.allTopics ++= topics - topicIds.foreach { case (name, id) => context.addTopicId(name, id) } context }