Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1569,13 +1569,15 @@ class KafkaController(val config: KafkaConfig,
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]

reassignments.foreach { case (tp, targetReplicas) =>
if (replicasAreValid(tp, targetReplicas)) {
maybeBuildReassignment(tp, targetReplicas) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
}
} else {
reassignmentResults.put(tp, new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT))
val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _))
maybeApiError match {
case None =>
maybeBuildReassignment(tp, targetReplicas) match {
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
}
case Some(err) =>
reassignmentResults.put(tp, err)
}
}

Expand All @@ -1588,22 +1590,27 @@ class KafkaController(val config: KafkaConfig,
}
}

private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = {
replicasOpt match {
case Some(replicas) =>
val replicaSet = replicas.toSet
if (replicas.isEmpty || replicas.size != replicaSet.size)
false
else if (replicas.exists(_ < 0))
false
else {
// Ensure that any new replicas are among the live brokers
val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
val newAssignment = currentAssignment.reassignTo(replicas)
newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds)
}

case None => true
private def validateReplicas(topicPartition: TopicPartition, replicas: Seq[Int]): Option[ApiError] = {
val replicaSet = replicas.toSet
if (replicas.isEmpty)
Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
s"Empty replica list specified in partition reassignment."))
else if (replicas.size != replicaSet.size) {
Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
s"Duplicate replica ids in partition reassignment replica list: $replicas"))
} else if (replicas.exists(_ < 0))
Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
s"Invalid broker id in replica list: $replicas"))
else {
// Ensure that any new replicas are among the live brokers
val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
val newAssignment = currentAssignment.reassignTo(replicas)
val areNewReplicasAlive = newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds)
if (!areNewReplicasAlive)
Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
s"Replica assignment has brokers that are not alive. Replica list: " +
s"${newAssignment.addingReplicas}, live broker list: ${controllerContext.liveBrokerIds}"))
else None
}
}

Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/utils/json/DecodeJson.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package kafka.utils.json

import scala.collection.{Map, Seq}
import scala.collection.compat._
import scala.language.higherKinds
import scala.collection.JavaConverters._

import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,64 @@
*/
package kafka.admin

import java.util.Optional

import kafka.admin.TopicCommand.ZookeeperTopicService
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.junit.Test
import org.apache.kafka.clients.admin.{AdminClientConfig, NewPartitionReassignment, NewTopic, AdminClient => JAdminClient}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.{After, Before, Test}

import scala.collection.JavaConverters._
import scala.collection.Seq

class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAwareTest {
import ReassignPartitionsIntegrationTest._

var servers: Seq[KafkaServer] = Seq()
val broker1 = 0
val broker2 = 1
val broker3 = 2
val broker4 = 3
val broker5 = 4
val broker6 = 5
val rack = Map(
broker1 -> "rack1",
broker2 -> "rack2",
broker3 -> "rack2",
broker4 -> "rack1",
broker5 -> "rack3",
broker6 -> "rack3"
)

@Before
override def setUp(): Unit = {
super.setUp()

val brokerConfigs = TestUtils.createBrokerConfigs(6, zkConnect, enableControlledShutdown = true)
servers = brokerConfigs.map { config =>
config.setProperty(KafkaConfig.RackProp, rack(config.getProperty(KafkaConfig.BrokerIdProp).toInt))
config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
config.setProperty(KafkaConfig.ControlledShutdownMaxRetriesProp, "1")
config.setProperty(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000")
config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp, "1000")
TestUtils.createServer(KafkaConfig.fromProps(config))
}
}

@After
override def tearDown(): Unit = {
TestUtils.shutdownServers(servers)
super.tearDown()
}

@Test
def testRackAwareReassign(): Unit = {
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkClient)

val numPartitions = 18
val replicationFactor = 3

Expand All @@ -37,11 +83,93 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAw

val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkClient,
rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
rack.keys.toSeq.sorted, topicJson, disableRackAware = false)

val assignment = proposedAssignment map { case (topicPartition, replicas) =>
(topicPartition.partition, replicas)
}
checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
checkReplicaDistribution(assignment, rack, rack.size, numPartitions, replicationFactor)
}

@Test
def testReassignPartition(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a basic reassignment test case. Do you think it is not covered in ReassignPartitionsClusterTest? It's not too clear to me why we have two separate test cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is covered. That code uses zkClient and ReassignPartitionsCommand. This test code uses AdminClient directly. Both of them test same backend code, so maybe its ok to get rid of this as when ReassignPartitionsCommand is fixed as part of KIP-500 we will have duplicate test.

Let me know what you prefer and I will take this test out.

TestUtils.resource(JAdminClient.create(createConfig(servers).asJava)) { client =>
val topic = "test-topic"
val partition = 0: Integer

val partitionAssignment = Map(partition -> Seq(broker1: Integer, broker2:Integer).asJava).asJava
val newTopic = new NewTopic(topic, partitionAssignment)
client.createTopics(Seq(newTopic).asJava).all().get()

val topicPartition = new TopicPartition(topic, partition)

// All sync replicas are in the ISR
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker1, broker2))

// Reassign replicas to different brokers
client.alterPartitionReassignments(
Map(topicPartition -> reassignmentEntry(Seq(broker3, broker4))).asJava
).all().get()

waitForAllReassignmentsToComplete(client)

// Metadata info is eventually consistent wait for update
TestUtils.waitForReplicasAssigned(client, topicPartition, Seq(broker3, broker4))
// All sync replicas are in the ISR
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker3, broker4))
}
}

@Test
def testInvalidReplicaIds(): Unit = {
TestUtils.resource(JAdminClient.create(createConfig(servers).asJava)) { client =>
val topic = "test-topic"
val partition = 0: Integer

val partitionAssignment = Map(partition -> Seq(broker1: Integer, broker2: Integer).asJava).asJava
val newTopic = new NewTopic(topic, partitionAssignment)
client.createTopics(Seq(newTopic).asJava).all().get()

val topicPartition = new TopicPartition(topic, partition)

// All sync replicas are in the ISR
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker1, broker2))

// Test reassignment with duplicate broker ids
var future = client.alterPartitionReassignments(
Map(topicPartition -> reassignmentEntry(Seq(broker4, broker5, broker5))).asJava
).all()
JTestUtils.assertFutureThrows(future, classOf[InvalidReplicaAssignmentException])

// Test reassignment with invalid broker ids
future = client.alterPartitionReassignments(
Map(topicPartition -> reassignmentEntry(Seq(-1, broker3))).asJava
).all()
JTestUtils.assertFutureThrows(future, classOf[InvalidReplicaAssignmentException])

// Test reassignment with extra broker ids
future = client.alterPartitionReassignments(
Map(topicPartition -> reassignmentEntry(Seq(6, broker2, broker3))).asJava
).all()
JTestUtils.assertFutureThrows(future, classOf[InvalidReplicaAssignmentException])
}
}
}

object ReassignPartitionsIntegrationTest {
def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = {
Map(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT")),
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "20000"
)
}

def reassignmentEntry(replicas: Seq[Int]): Optional[NewPartitionReassignment] = {
Optional.of(new NewPartitionReassignment(replicas.map(r => r: Integer).asJava))
}

def waitForAllReassignmentsToComplete(client: JAdminClient): Unit = {
TestUtils.waitUntilTrue(() => client.listPartitionReassignments().reassignments().get().isEmpty,
s"There still are ongoing reassignments", pause = 100L)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,7 @@ object LeaderElectionCommandTest {
}

def bootstrapServers(servers: Seq[KafkaServer]): String = {
servers.map { server =>
val port = server.socketServer.boundPort(ListenerName.normalised("PLAINTEXT"))
s"localhost:$port"
}.headOption.mkString(",")
TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT"))
}

def tempTopicPartitionFile(partitions: Set[TopicPartition]): Path = {
Expand Down
16 changes: 16 additions & 0 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1553,6 +1553,22 @@ object TestUtils extends Logging {
)
}

def waitForReplicasAssigned(client: Admin, partition: TopicPartition, brokerIds: Seq[Int]): Unit = {
TestUtils.waitUntilTrue(
() => {
val description = client.describeTopics(Set(partition.topic).asJava).all.get.asScala
val replicas = description
.values
.flatMap(_.partitions.asScala.flatMap(_.replicas.asScala))
.map(_.id)
.toSeq

brokerIds == replicas
},
s"Expected brokers $brokerIds to be the replicas for $partition"
)
}

/**
* Capture the console output during the execution of the provided function.
*/
Expand Down