Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
}

val unthrottledBrokerConfigs =
0.to(4).map {
case brokerId => (brokerId, brokerLevelThrottles.map {
case throttleName => (throttleName, -1L)
}.toMap)
0.to(4).map { brokerId =>
brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap
}.toMap

/**
Expand Down Expand Up @@ -277,15 +275,15 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
def testCancellation(): Unit = {
cluster = new ReassignPartitionsTestCluster(zkConnect)
cluster.setup()
cluster.produceMessages("foo", 0, 60)
cluster.produceMessages("baz", 1, 80)
cluster.produceMessages("foo", 0, 200)
cluster.produceMessages("baz", 1, 200)
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" +
"""{"topic":"baz","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]}""" +
"""]}"""
assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
val interBrokerThrottle = 100L
val interBrokerThrottle = 1L
runExecuteAssignment(cluster.adminClient, false, assignment, interBrokerThrottle, -1L)
val throttledConfigMap = Map[String, Long](
brokerLevelLeaderThrottle -> interBrokerThrottle,
Expand All @@ -303,10 +301,9 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
// Verify that the reassignment is running. The very low throttle should keep it
// from completing before this runs.
waitForVerifyAssignment(cluster.adminClient, assignment, true,
VerifyAssignmentResult(Map(new TopicPartition("foo", 0) ->
PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false),
new TopicPartition("baz", 1) ->
PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)),
VerifyAssignmentResult(Map(
new TopicPartition("foo", 0) -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3), false),
new TopicPartition("baz", 1) -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3), false)),
true, Map(), false))
// Cancel the reassignment.
assertEquals((Set(
Expand Down Expand Up @@ -339,13 +336,12 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
* information. The nested maps are keyed on throttle name.
*/
private def describeBrokerLevelThrottles(brokerIds: Seq[Int]): Map[Int, Map[String, Long]] = {
brokerIds.map {
case brokerId =>
val props = zkClient.getEntityConfigs("brokers", brokerId.toString)
(brokerId, brokerLevelThrottles.map {
case throttleName => (throttleName,
props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong)
}.toMap)
brokerIds.map { brokerId =>
val props = zkClient.getEntityConfigs("brokers", brokerId.toString)
val throttles = brokerLevelThrottles.map { throttleName =>
(throttleName, props.getOrDefault(throttleName, "-1").asInstanceOf[String].toLong)
}.toMap
brokerId -> throttles
}.toMap
}

Expand Down