diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index a167756f0fd35..9d142a5c20d64 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -20,7 +20,7 @@ package kafka.admin import java.util.Random import java.util.Properties import kafka.api.{TopicMetadata, PartitionMetadata} -import kafka.cluster.Broker +import kafka.cluster.{Broker, Cluster} import kafka.log.LogConfig import kafka.utils.{Logging, ZkUtils, Json} import org.I0Itec.zkclient.ZkClient @@ -57,10 +57,12 @@ object AdminUtils extends Logging { * p7 p8 p9 p5 p6 (3nd replica) */ def assignReplicasToBrokers(brokerList: Seq[Int], + filteredCluster: Cluster, nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1, - startPartitionId: Int = -1) + startPartitionId: Int = -1, + maxReplicaPerRack: Int = -1) : Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new AdminOperationException("number of partitions must be larger than 0") @@ -79,8 +81,33 @@ object AdminUtils extends Logging { nextReplicaShift += 1 val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size var replicaList = List(brokerList(firstReplicaIndex)) - for (j <- 0 until replicationFactor - 1) - replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size)) + if (maxReplicaPerRack <= 0) { + for (j <- 0 until replicationFactor - 1) + replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size)) + } else { + var rackReplicaCount: mutable.Map[Int, Int] = mutable.Map(filteredCluster.getBroker(brokerList(firstReplicaIndex)).get.rack -> 1) + var k = 0 + for (j <- 0 until replicationFactor - 1) { + var done = false; + while (!done && k < brokerList.size) { + val broker = brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, k, brokerList.size)) + val rack = filteredCluster.getBroker(broker).get.rack + if (!(rackReplicaCount contains rack)) { + replicaList ::= broker + rackReplicaCount += (rack -> 1) + done = true; + } else if (rackReplicaCount(rack) < maxReplicaPerRack) { + rackReplicaCount(rack) = rackReplicaCount(rack) + 1 + replicaList ::= broker + done = true; + } + k = k + 1 + } + if (!done) { + throw new AdminOperationException("not enough brokers available in unique racks to meet maxReplicaPerRack limit of " + maxReplicaPerRack) + } + } + } ret.put(currentPartitionId, replicaList.reverse) currentPartitionId = currentPartitionId + 1 } @@ -89,10 +116,12 @@ object AdminUtils extends Logging { def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") { val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) - if (existingPartitionsReplicaList.size == 0) + val existingMaxRackReplication = ZkUtils.getMaxRackReplicationForTopics(zkClient, List(topic)) + if (existingPartitionsReplicaList.size == 0 || existingMaxRackReplication.size == 0) throw new AdminOperationException("The topic %s does not exist".format(topic)) val existingReplicaList = existingPartitionsReplicaList.head._2 + val maxReplicaPerRack = existingMaxRackReplication.head._2 val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size if (partitionsToAdd <= 0) throw new AdminOperationException("The number of partitions for a topic can only be increased") @@ -100,7 +129,7 @@ object AdminUtils extends Logging { // create the new partition replication list val brokerList = ZkUtils.getSortedBrokerList(zkClient) val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") - AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size) + AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size, maxReplicaPerRack) else getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size) @@ -114,7 +143,7 @@ object AdminUtils extends Logging { val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) // add the new list partitionReplicaList ++= newPartitionReplicaList - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, maxReplicaPerRack = maxReplicaPerRack, update = true) } def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = { @@ -151,16 +180,18 @@ object AdminUtils extends Logging { topic: String, partitions: Int, replicationFactor: Int, + maxReplicaPerRack: Int = -1, topicConfig: Properties = new Properties) { val brokerList = ZkUtils.getSortedBrokerList(zkClient) - val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig) + val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), partitions, replicationFactor, -1, -1, maxReplicaPerRack) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig, maxReplicaPerRack) } def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient, topic: String, partitionReplicaAssignment: Map[Int, Seq[Int]], config: Properties = new Properties, + maxReplicaPerRack: Int = -1, update: Boolean = false) { // validate arguments Topic.validate(topic) @@ -176,13 +207,13 @@ object AdminUtils extends Logging { writeTopicConfig(zkClient, topic, config) // create the partition assignment - writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update) + writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, maxReplicaPerRack, update) } - private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) { + private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], maxReplicaPerRack: Int, update: Boolean) { try { val zkPath = ZkUtils.getTopicPath(topic) - val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2))) + val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)), maxReplicaPerRack) if (!update) { info("Topic creation " + jsonPartitionData.toString) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 2637586af99cf..ac12aa6748158 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -94,7 +94,7 @@ object ReassignPartitionsCommand extends Logging { var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) groupedByTopic.foreach { topicInfo => - val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, + val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, ZkUtils.getFilteredCluster(zkClient, brokerListToReassign), topicInfo._2.size, topicInfo._2.head._2.size) partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) } diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 842c11047cca0..d27e91f621e87 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -76,14 +76,15 @@ object TopicCommand { def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topic = opts.options.valueOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) + val maxReplicaPerRack = if (opts.options.has(opts.maxRackReplicationOpt)) opts.options.valueOf(opts.maxRackReplicationOpt).intValue else -1 if (opts.options.has(opts.replicaAssignmentOpt)) { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs, maxReplicaPerRack) } else { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) val partitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue - AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs) + AdminUtils.createTopic(zkClient, topic, partitions, replicas, maxReplicaPerRack, configs) } println("Created topic \"%s\".".format(topic)) } @@ -101,6 +102,9 @@ object TopicCommand { AdminUtils.changeTopicConfig(zkClient, topic, configs) println("Updated config for topic \"%s\".".format(topic)) } + if(opts.options.has(opts.maxRackReplicationOpt)) { + Utils.croak("Changing the max-rack-replication is not supported.") + } if(opts.options.has(opts.partitionsOpt)) { println("WARNING: If partitions are increased for a topic that has a key, the partition " + "logic or ordering of the messages will be affected") @@ -245,6 +249,10 @@ object TopicCommand { .withRequiredArg .describedAs("replication factor") .ofType(classOf[java.lang.Integer]) + val maxRackReplicationOpt = parser.accepts("max-rack-replication", "The maximum number of replicas assigned to a single rack for each partition in the topic being created.") + .withRequiredArg + .describedAs("max rack replication") + .ofType(classOf[java.lang.Integer]) val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.") .withRequiredArg .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " + diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 1d2f81be4f980..22819133ea7d4 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -89,7 +89,7 @@ object ClientUtils extends Logging{ } /** - * Parse a list of broker urls in the form host1:port1, host2:port2, ... + * Parse a list of broker urls in the form host1:port1:rack1, host2:port2:rack2, ... */ def parseBrokerList(brokerListStr: String): Seq[Broker] = { val brokersStr = Utils.parseCsvList(brokerListStr) @@ -100,7 +100,8 @@ object ClientUtils extends Logging{ val brokerInfos = brokerStr.split(":") val hostName = brokerInfos(0) val port = brokerInfos(1).toInt - new Broker(brokerId, hostName, port) + val rack = if (brokerInfos.size > 2) brokerInfos(2).toInt else 0 + new Broker(brokerId, hostName, port, rack) }) } diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 9407ed21fbbd5..fbc77fc9d8fc3 100644 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -37,7 +37,11 @@ private[kafka] object Broker { val brokerInfo = m.asInstanceOf[Map[String, Any]] val host = brokerInfo.get("host").get.asInstanceOf[String] val port = brokerInfo.get("port").get.asInstanceOf[Int] - new Broker(id, host, port) + val rack = brokerInfo.get("rack") match { + case Some(r) => r.asInstanceOf[Int] + case None => 0 + } + new Broker(id, host, port, rack) case None => throw new BrokerNotAvailableException("Broker id %d does not exist".format(id)) } @@ -50,32 +54,34 @@ private[kafka] object Broker { val id = buffer.getInt val host = readShortString(buffer) val port = buffer.getInt - new Broker(id, host, port) + val rack = buffer.getInt + new Broker(id, host, port, rack) } } -private[kafka] case class Broker(val id: Int, val host: String, val port: Int) { +private[kafka] case class Broker(val id: Int, val host: String, val port: Int, val rack: Int) { - override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port) + override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port + ",rack:" + rack) - def getConnectionString(): String = host + ":" + port + def getConnectionString(): String = host + ":" + port + ":" + rack def writeTo(buffer: ByteBuffer) { buffer.putInt(id) writeShortString(buffer, host) buffer.putInt(port) + buffer.putInt(rack) } - def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ + def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ + 4 /* rack id*/ override def equals(obj: Any): Boolean = { obj match { case null => false - case n: Broker => id == n.id && host == n.host && port == n.port + case n: Broker => id == n.id && host == n.host && port == n.port && rack == n.rack case _ => false } } - override def hashCode(): Int = hashcode(id, host, port) + override def hashCode(): Int = hashcode(id, host, port, rack) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a0267ae2670e8..a2efb98e93ae7 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -52,7 +52,8 @@ class ControllerContext(val zkClient: ZkClient, var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap, var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = - new mutable.HashSet) { + new mutable.HashSet, + var maxRackReplicaAssignment: mutable.Map[String, Int] = mutable.Map.empty) { private var liveBrokersUnderlying: Set[Broker] = Set.empty private var liveBrokerIdsUnderlying: Set[Int] = Set.empty @@ -463,8 +464,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg "reassigned not yet caught up with the leader") val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet + val existingMaxReplicaAssignment = controllerContext.maxRackReplicaAssignment(topicAndPartition.topic) //1. Update AR in ZK with OAR + RAR. - updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) + updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq, existingMaxReplicaAssignment) //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), newAndOldReplicas.toSeq) @@ -488,7 +490,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted) stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) //10. Update AR in ZK with RAR. - updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas) + val existingMaxReplicaAssignment = controllerContext.maxRackReplicaAssignment(topicAndPartition.topic) + updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas, existingMaxReplicaAssignment) //11. Update the /admin/reassign_partitions path in ZK to remove this partition. removePartitionFromReassignedPartitions(topicAndPartition) info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition)) @@ -637,6 +640,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq) controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] + controllerContext.maxRackReplicaAssignment = ZkUtils.getMaxRackReplicationForTopics(zkClient, controllerContext.allTopics.toSeq) controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int] // update the leader and isr cache for all existing partitions from Zookeeper updateLeaderAndIsrCache() @@ -746,10 +750,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition, - replicas: Seq[Int]) { + replicas: Seq[Int], + maxRackReplication: Int) { val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic)) partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas) - updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic) + updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic, maxRackReplication) info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(","))) // update the assigned replica list after a successful zookeeper write controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas) @@ -811,10 +816,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition, - newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) { + newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]], + maxRackReplication: Int) { try { val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic) - val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2))) + val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)), maxRackReplication) ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap) debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) } catch { diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index ac4262a403fc7..90478434b5408 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -387,9 +387,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { controllerContext.allTopics = currentChildren val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq) + val maxRackReplicaAssignment = ZkUtils.getMaxRackReplicationForTopics(zkClient, newTopics.toSeq) controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1.topic)) controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment) + controllerContext.maxRackReplicaAssignment.++=(maxRackReplicaAssignment) info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, deletedTopics, addedPartitionReplicaAssignment)) if(newTopics.size > 0) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 29abc46d22909..b705a1a2006b5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -574,7 +574,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ErrorMapping.UnknownTopicOrPartitionCode => if (config.autoCreateTopicsEnable) { try { - AdminUtils.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor) + AdminUtils.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor, config.defaultMaxRackReplication) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor)) } catch { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3c3aafc2b3f06..c171185606376 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -48,6 +48,12 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the broker id for this server */ val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue)) + /* the rack id for this server */ + val rackId: Int = props.getIntInRange("broker.rack", 0, (0, Int.MaxValue)) + + /* the default max-rack-replication for auto topic creation */ + val defaultMaxRackReplication = props.getIntInRange("default.max.rack.replication", -1, (-1, Int.MaxValue)) + /* the maximum size of message that the server can receive */ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 9dca55c925494..263f9b058570a 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -33,6 +33,7 @@ import java.net.InetAddress * we are dead. */ class KafkaHealthcheck(private val brokerId: Int, + private val rackId: Int, private val advertisedHost: String, private val advertisedPort: Int, private val zkSessionTimeoutMs: Int, @@ -55,7 +56,7 @@ class KafkaHealthcheck(private val brokerId: Int, else advertisedHost val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, rackId, zkSessionTimeoutMs, jmxPort) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5e34f95e64eaf..3e55ce59356cf 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -99,7 +99,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager.startup() /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.rackId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index b42e52b8e5668..b71b96deb6162 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -185,11 +185,11 @@ object ZkUtils extends Logging { } } - def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) { + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, rack: Int, timeout: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val timestamp = SystemTime.milliseconds.toString - val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> port, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) - val expectedBroker = new Broker(id, host, port) + val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "rack" -> rack, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) + val expectedBroker = new Broker(id, host, port, rack) try { createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker, @@ -220,8 +220,8 @@ object ZkUtils extends Logging { /** * Get JSON partition to replica map from zookeeper. */ - def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = { - Json.encode(Map("version" -> 1, "partitions" -> map)) + def replicaAssignmentZkData(map: Map[String, Seq[Int]], maxRackReplication: Int): String = { + Json.encode(Map("version" -> 2, "partitions" -> map, "max-rack-replication" -> maxRackReplication)) } /** @@ -504,6 +504,18 @@ object ZkUtils extends Logging { cluster } + /* Provide a Cluster object filtered with the given broker list. Usefull when dealing with a sub-cluster */ + def getFilteredCluster(zkClient: ZkClient, brokerList: Seq[Int]) : Cluster = { + val cluster = new Cluster + for (brokerId <- brokerList) { + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match { + case Some(brokerInfo) => cluster.add(Broker.createBroker(brokerId, brokerInfo)) + case None => /* do nothing */ + } + } + cluster + } + def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: Set[TopicAndPartition]) : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] @@ -516,6 +528,28 @@ object ZkUtils extends Logging { ret } + def getMaxRackReplicationForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, Int] = { + val ret = new mutable.HashMap[String, Int]() + topics.foreach { topic => + val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1 + jsonPartitionMapOpt match { + case Some(jsonPartitionMap) => + Json.parseFull(jsonPartitionMap) match { + case Some(m) => m.asInstanceOf[Map[String, Any]].get("max-rack-replication") match { + case Some(repl) => + val maxRackReplication = repl.asInstanceOf[Int] + ret.put(topic, maxRackReplication) + case None => + ret.put(topic, -1) + } + case None => + } + case None => + } + } + ret + } + def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = { val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]] topics.foreach { topic => diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 115e20305a154..e4d739a3d7f1c 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -61,7 +61,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { val server4 = TestUtils.createServer(new KafkaConfig(configProps4)) servers ++= List(server1, server2, server3, server4) - brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port)) + brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port, s.config.rackId)) // create topics with 1 partition, 2 replicas, one on each broker AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic1, Map(0->Seq(0,1))) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 59de1b469fece..d8c7e864e4062 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -24,6 +24,7 @@ import kafka.utils._ import kafka.log._ import kafka.zk.ZooKeeperTestHarness import kafka.utils.{Logging, ZkUtils, TestUtils} +import kafka.cluster.{Broker, Cluster} import kafka.common.{TopicExistsException, TopicAndPartition} import kafka.server.{KafkaServer, KafkaConfig} import java.io.File @@ -37,12 +38,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // test 0 replication factor intercept[AdminOperationException] { - AdminUtils.assignReplicasToBrokers(brokerList, 10, 0) + AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), 10, 0) } // test wrong replication factor intercept[AdminOperationException] { - AdminUtils.assignReplicasToBrokers(brokerList, 10, 6) + AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), 10, 6) } // correct assignment @@ -58,11 +59,73 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { 8 -> List(3, 0, 1), 9 -> List(4, 1, 2)) - val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0) + val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), 10, 3, 0) val e = (expectedAssignment.toList == actualAssignment.toList) assertTrue(expectedAssignment.toList == actualAssignment.toList) } + @Test + def testRackReplicaAssignment() { + val brokerList = List(0, 1, 2, 3, 4) + + // test not enough distinct rack-ids + intercept[AdminOperationException] { + AdminUtils.assignReplicasToBrokers(brokerList, new Cluster(List( + Broker(0, "localhost", 0, 0), + Broker(1, "localhost", 0, 1), + Broker(2, "localhost", 0, 0), + Broker(3, "localhost", 0, 1), + Broker(4, "localhost", 0, 0))), 10, 3, maxReplicaPerRack = 1) + } + + { // correct assignment with max-rack-replication = 1 + val expectedAssignment = Map( + 0 -> List(0, 1, 2), + 1 -> List(1, 2, 3), + 2 -> List(2, 3, 4), + 3 -> List(3, 4, 2), + 4 -> List(4, 0, 2), + 5 -> List(0, 2, 4), + 6 -> List(1, 3, 2), + 7 -> List(2, 4, 0), + 8 -> List(3, 1, 2), + 9 -> List(4, 2, 3)) + + val filteredCluster = new Cluster(List( + Broker(0, "localhost", 0, 0), + Broker(1, "localhost", 0, 1), + Broker(2, "localhost", 0, 2), + Broker(3, "localhost", 0, 0), + Broker(4, "localhost", 0, 1))) + val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, filteredCluster, 10, 3, 0, maxReplicaPerRack = 1) + val e = (expectedAssignment.toList == actualAssignment.toList) + assertTrue(expectedAssignment.toList == actualAssignment.toList) + } + { // correct assignment with max-rack-replication = 2 + val expectedAssignment = Map( + 0 -> List(0, 1, 2), + 1 -> List(1, 2, 3), + 2 -> List(2, 3, 4), + 3 -> List(3, 4, 0), + 4 -> List(4, 0, 1), + 5 -> List(0, 2, 3), + 6 -> List(1, 3, 4), + 7 -> List(2, 4, 0), + 8 -> List(3, 0, 1), + 9 -> List(4, 1, 2)) + + val filteredCluster = new Cluster(List( + Broker(0, "localhost", 0, 0), + Broker(1, "localhost", 0, 1), + Broker(2, "localhost", 0, 2), + Broker(3, "localhost", 0, 0), + Broker(4, "localhost", 0, 1))) + val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, filteredCluster, 10, 3, 0, maxReplicaPerRack = 2) + val e = (expectedAssignment.toList == actualAssignment.toList) + assertTrue(expectedAssignment.toList == actualAssignment.toList) + } + } + @Test def testManualReplicaAssignment() { val brokers = List(0, 1, 2, 3, 4) @@ -377,7 +440,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // create a topic with a few config overrides and check that they are applied val maxMessageSize = 1024 val retentionMs = 1000*1000 - AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs)) + AdminUtils.createTopic(server.zkClient, topic, partitions, 1, topicConfig = makeConfig(maxMessageSize, retentionMs)) checkConfig(maxMessageSize, retentionMs) // now double the config values for the topic and check that it is applied diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 8df0982a1e71e..b411487302924 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -75,7 +75,7 @@ object SerializationTestUtils{ TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100) ) - private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013)) + private val brokers = List(new Broker(0, "localhost", 1011, 0), new Broker(1, "localhost", 1012, 0), new Broker(2, "localhost", 1013, 0)) private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0) private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1) private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2) diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 9347ea60f2c02..57b9179cef6e6 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -48,7 +48,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { val group = "group1" val consumer0 = "consumer0" val consumedOffset = 5 - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) + val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port, c.rackId))) val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, 0, diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 47130d33c36fa..a5b9d72957cbe 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -40,7 +40,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { yield new KafkaConfig(props) val messages = new mutable.HashMap[Int, Seq[Array[Byte]]] val topic = "topic" - val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port))) + val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port, c.rackId))) val shutdown = ZookeeperConsumerConnector.shutdownCommand val queue = new LinkedBlockingQueue[FetchedDataChunk] val topicInfos = configs.map(c => new PartitionTopicInfo(topic, diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 9998a1156d41d..7650ba5ef8b2f 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -34,7 +34,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) val configs = props.map(p => new KafkaConfig(p)) private var server1: KafkaServer = null - val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port)) + val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port, c.rackId)) override def setUp() { super.setUp() diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index 18e355501808c..210583f2d1ab6 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -167,8 +167,8 @@ class AsyncProducerTest extends JUnit3Suite { val props = new Properties() props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - val broker1 = new Broker(0, "localhost", 9092) - val broker2 = new Broker(1, "localhost", 9093) + val broker1 = new Broker(0, "localhost", 9092, 0) + val broker2 = new Broker(1, "localhost", 9093, 0) broker1 // form expected partitions metadata val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2)) @@ -403,7 +403,7 @@ class AsyncProducerTest extends JUnit3Suite { val config = new ProducerConfig(props) val topic1 = "topic1" - val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092) + val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092, 0) val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata] topicPartitionInfos.put("topic1", topic1Metadata) @@ -492,12 +492,12 @@ class AsyncProducerTest extends JUnit3Suite { producerDataList } - private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { - getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort) + private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int, rackId: Int = 0): TopicMetadata = { + getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort, rackId) } - private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = { - val broker1 = new Broker(brokerId, brokerHost, brokerPort) + private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int, rackId: Int): TopicMetadata = { + val broker1 = new Broker(brokerId, brokerHost, brokerPort, rackId) new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1)))) } diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 38e3ae72a87e1..a2215cfabbfe3 100644 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -123,7 +123,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { // start another controller val controllerId = 2 val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort())) - val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port)) + val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port, s.config.rackId)) val controllerContext = new ControllerContext(zkClient, 6000) controllerContext.liveBrokers = brokers.toSet val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index d88b6c3e8fd80..98f9066eb3621 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -345,13 +345,13 @@ object TestUtils extends Logging { } def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost", 6667)) - brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, 6000, jmxPort = -1)) + val brokers = ids.map(id => new Broker(id, "localhost", 6667, 0)) + brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, b.rack, 6000, jmxPort = -1)) brokers } def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost", 6667)) + val brokers = ids.map(id => new Broker(id, "localhost", 6667, 0)) brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b)) brokers }