diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala index c365d5fb84179..974877b7afbd2 100644 --- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala @@ -56,10 +56,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { } val unthrottledBrokerConfigs = - 0.to(4).map { - case brokerId => (brokerId, brokerLevelThrottles.map { - case throttleName => (throttleName, -1L) - }.toMap) + 0.to(4).map { brokerId => + brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap }.toMap /** @@ -277,15 +275,15 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { def testCancellation(): Unit = { cluster = new ReassignPartitionsTestCluster(zkConnect) cluster.setup() - cluster.produceMessages("foo", 0, 60) - cluster.produceMessages("baz", 1, 80) + cluster.produceMessages("foo", 0, 200) + cluster.produceMessages("baz", 1, 200) val assignment = """{"version":1,"partitions":""" + """[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" + """{"topic":"baz","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]}""" + """]}""" assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq)) - val interBrokerThrottle = 100L + val interBrokerThrottle = 1L runExecuteAssignment(cluster.adminClient, false, assignment, interBrokerThrottle, -1L) val throttledConfigMap = Map[String, Long]( brokerLevelLeaderThrottle -> interBrokerThrottle, @@ -303,10 +301,9 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { // Verify that the reassignment is running. The very low throttle should keep it // from completing before this runs. waitForVerifyAssignment(cluster.adminClient, assignment, true, - VerifyAssignmentResult(Map(new TopicPartition("foo", 0) -> - PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false), - new TopicPartition("baz", 1) -> - PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)), + VerifyAssignmentResult(Map( + new TopicPartition("foo", 0) -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false), + new TopicPartition("baz", 1) -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)), true, Map(), false)) // Cancel the reassignment. assertEquals((Set( @@ -339,13 +336,12 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness { * information. The nested maps are keyed on throttle name. */ private def describeBrokerLevelThrottles(brokerIds: Seq[Int]): Map[Int, Map[String, Long]] = { - brokerIds.map { - case brokerId => - val props = zkClient.getEntityConfigs("brokers", brokerId.toString) - (brokerId, brokerLevelThrottles.map { - case throttleName => (throttleName, - props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong) - }.toMap) + brokerIds.map { brokerId => + val props = zkClient.getEntityConfigs("brokers", brokerId.toString) + val throttles = brokerLevelThrottles.map { throttleName => + (throttleName, props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong) + }.toMap + brokerId -> throttles }.toMap }