diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a7dd3ad336a14..ebc684bf89203 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1569,13 +1569,15 @@ class KafkaController(val config: KafkaConfig, val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] reassignments.foreach { case (tp, targetReplicas) => - if (replicasAreValid(tp, targetReplicas)) { - maybeBuildReassignment(tp, targetReplicas) match { - case Some(context) => partitionsToReassign.put(tp, context) - case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) - } - } else { - reassignmentResults.put(tp, new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT)) + val maybeApiError = targetReplicas.flatMap(validateReplicas(tp, _)) + maybeApiError match { + case None => + maybeBuildReassignment(tp, targetReplicas) match { + case Some(context) => partitionsToReassign.put(tp, context) + case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) + } + case Some(err) => + reassignmentResults.put(tp, err) } } @@ -1588,22 +1590,27 @@ class KafkaController(val config: KafkaConfig, } } - private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = { - replicasOpt match { - case Some(replicas) => - val replicaSet = replicas.toSet - if (replicas.isEmpty || replicas.size != replicaSet.size) - false - else if (replicas.exists(_ < 0)) - false - else { - // Ensure that any new replicas are among the live brokers - val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) - val newAssignment = currentAssignment.reassignTo(replicas) - newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds) - } - - case None => true + private def validateReplicas(topicPartition: TopicPartition, replicas: Seq[Int]): Option[ApiError] = { + val replicaSet = replicas.toSet + if (replicas.isEmpty) + Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + s"Empty replica list specified in partition reassignment.")) + else if (replicas.size != replicaSet.size) { + Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + s"Duplicate replica ids in partition reassignment replica list: $replicas")) + } else if (replicas.exists(_ < 0)) + Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + s"Invalid broker id in replica list: $replicas")) + else { + // Ensure that any new replicas are among the live brokers + val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) + val newAssignment = currentAssignment.reassignTo(replicas) + val areNewReplicasAlive = newAssignment.addingReplicas.toSet.subsetOf(controllerContext.liveBrokerIds) + if (!areNewReplicasAlive) + Some(new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, + s"Replica assignment has brokers that are not alive. Replica list: " + + s"${newAssignment.addingReplicas}, live broker list: ${controllerContext.liveBrokerIds}")) + else None } } diff --git a/core/src/main/scala/kafka/utils/json/DecodeJson.scala b/core/src/main/scala/kafka/utils/json/DecodeJson.scala index 31adace39fe30..d1b4355a6112a 100644 --- a/core/src/main/scala/kafka/utils/json/DecodeJson.scala +++ b/core/src/main/scala/kafka/utils/json/DecodeJson.scala @@ -19,7 +19,6 @@ package kafka.utils.json import scala.collection.{Map, Seq} import scala.collection.compat._ -import scala.language.higherKinds import scala.collection.JavaConverters._ import com.fasterxml.jackson.databind.{JsonMappingException, JsonNode} diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala index c4733860ff006..5d33552fa70a1 100644 --- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala @@ -12,18 +12,64 @@ */ package kafka.admin +import java.util.Optional + import kafka.admin.TopicCommand.ZookeeperTopicService +import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness -import org.junit.Test +import org.apache.kafka.clients.admin.{AdminClientConfig, NewPartitionReassignment, NewTopic, AdminClient => JAdminClient} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.InvalidReplicaAssignmentException +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.test.{TestUtils => JTestUtils} +import org.junit.{After, Before, Test} + +import scala.collection.JavaConverters._ +import scala.collection.Seq class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAwareTest { + import ReassignPartitionsIntegrationTest._ + + var servers: Seq[KafkaServer] = Seq() + val broker1 = 0 + val broker2 = 1 + val broker3 = 2 + val broker4 = 3 + val broker5 = 4 + val broker6 = 5 + val rack = Map( + broker1 -> "rack1", + broker2 -> "rack2", + broker3 -> "rack2", + broker4 -> "rack1", + broker5 -> "rack3", + broker6 -> "rack3" + ) + + @Before + override def setUp(): Unit = { + super.setUp() + + val brokerConfigs = TestUtils.createBrokerConfigs(6, zkConnect, enableControlledShutdown = true) + servers = brokerConfigs.map { config => + config.setProperty(KafkaConfig.RackProp, rack(config.getProperty(KafkaConfig.BrokerIdProp).toInt)) + config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false") + config.setProperty(KafkaConfig.ControlledShutdownMaxRetriesProp, "1") + config.setProperty(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000") + config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp, "1000") + TestUtils.createServer(KafkaConfig.fromProps(config)) + } + } + + @After + override def tearDown(): Unit = { + TestUtils.shutdownServers(servers) + super.tearDown() + } @Test def testRackAwareReassign(): Unit = { - val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3") - TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkClient) - val numPartitions = 18 val replicationFactor = 3 @@ -37,11 +83,93 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAw val topicJson = """{"topics": [{"topic": "foo"}], "version":1}""" val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkClient, - rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false) + rack.keys.toSeq.sorted, topicJson, disableRackAware = false) val assignment = proposedAssignment map { case (topicPartition, replicas) => (topicPartition.partition, replicas) } - checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor) + checkReplicaDistribution(assignment, rack, rack.size, numPartitions, replicationFactor) + } + + @Test + def testReassignPartition(): Unit = { + TestUtils.resource(JAdminClient.create(createConfig(servers).asJava)) { client => + val topic = "test-topic" + val partition = 0: Integer + + val partitionAssignment = Map(partition -> Seq(broker1: Integer, broker2:Integer).asJava).asJava + val newTopic = new NewTopic(topic, partitionAssignment) + client.createTopics(Seq(newTopic).asJava).all().get() + + val topicPartition = new TopicPartition(topic, partition) + + // All sync replicas are in the ISR + TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker1, broker2)) + + // Reassign replicas to different brokers + client.alterPartitionReassignments( + Map(topicPartition -> reassignmentEntry(Seq(broker3, broker4))).asJava + ).all().get() + + waitForAllReassignmentsToComplete(client) + + // Metadata info is eventually consistent wait for update + TestUtils.waitForReplicasAssigned(client, topicPartition, Seq(broker3, broker4)) + // All sync replicas are in the ISR + TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker3, broker4)) + } + } + + @Test + def testInvalidReplicaIds(): Unit = { + TestUtils.resource(JAdminClient.create(createConfig(servers).asJava)) { client => + val topic = "test-topic" + val partition = 0: Integer + + val partitionAssignment = Map(partition -> Seq(broker1: Integer, broker2: Integer).asJava).asJava + val newTopic = new NewTopic(topic, partitionAssignment) + client.createTopics(Seq(newTopic).asJava).all().get() + + val topicPartition = new TopicPartition(topic, partition) + + // All sync replicas are in the ISR + TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker1, broker2)) + + // Test reassignment with duplicate broker ids + var future = client.alterPartitionReassignments( + Map(topicPartition -> reassignmentEntry(Seq(broker4, broker5, broker5))).asJava + ).all() + JTestUtils.assertFutureThrows(future, classOf[InvalidReplicaAssignmentException]) + + // Test reassignment with invalid broker ids + future = client.alterPartitionReassignments( + Map(topicPartition -> reassignmentEntry(Seq(-1, broker3))).asJava + ).all() + JTestUtils.assertFutureThrows(future, classOf[InvalidReplicaAssignmentException]) + + // Test reassignment with extra broker ids + future = client.alterPartitionReassignments( + Map(topicPartition -> reassignmentEntry(Seq(6, broker2, broker3))).asJava + ).all() + JTestUtils.assertFutureThrows(future, classOf[InvalidReplicaAssignmentException]) + } + } +} + +object ReassignPartitionsIntegrationTest { + def createConfig(servers: Seq[KafkaServer]): Map[String, Object] = { + Map( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT")), + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG -> "20000" + ) + } + + def reassignmentEntry(replicas: Seq[Int]): Optional[NewPartitionReassignment] = { + Optional.of(new NewPartitionReassignment(replicas.map(r => r: Integer).asJava)) + } + + def waitForAllReassignmentsToComplete(client: JAdminClient): Unit = { + TestUtils.waitUntilTrue(() => client.listPartitionReassignments().reassignments().get().isEmpty, + s"There still are ongoing reassignments", pause = 100L) } } diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala index a4f57a400f514..e328f2afb206a 100644 --- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala @@ -314,10 +314,7 @@ object LeaderElectionCommandTest { } def bootstrapServers(servers: Seq[KafkaServer]): String = { - servers.map { server => - val port = server.socketServer.boundPort(ListenerName.normalised("PLAINTEXT")) - s"localhost:$port" - }.headOption.mkString(",") + TestUtils.bootstrapServers(servers, new ListenerName("PLAINTEXT")) } def tempTopicPartitionFile(partitions: Set[TopicPartition]): Path = { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 535ff45d9918f..6edff6ea73726 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1553,6 +1553,22 @@ object TestUtils extends Logging { ) } + def waitForReplicasAssigned(client: Admin, partition: TopicPartition, brokerIds: Seq[Int]): Unit = { + TestUtils.waitUntilTrue( + () => { + val description = client.describeTopics(Set(partition.topic).asJava).all.get.asScala + val replicas = description + .values + .flatMap(_.partitions.asScala.flatMap(_.replicas.asScala)) + .map(_.id) + .toSeq + + brokerIds == replicas + }, + s"Expected brokers $brokerIds to be the replicas for $partition" + ) + } + /** * Capture the console output during the execution of the provided function. */