Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 43 additions & 12 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.admin
import java.util.Random
import java.util.Properties
import kafka.api.{TopicMetadata, PartitionMetadata}
import kafka.cluster.Broker
import kafka.cluster.{Broker, Cluster}
import kafka.log.LogConfig
import kafka.utils.{Logging, ZkUtils, Json}
import org.I0Itec.zkclient.ZkClient
Expand Down Expand Up @@ -57,10 +57,12 @@ object AdminUtils extends Logging {
* p7 p8 p9 p5 p6 (3nd replica)
*/
def assignReplicasToBrokers(brokerList: Seq[Int],
filteredCluster: Cluster,
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1)
startPartitionId: Int = -1,
maxReplicaPerRack: Int = -1)
: Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new AdminOperationException("number of partitions must be larger than 0")
Expand All @@ -79,8 +81,33 @@ object AdminUtils extends Logging {
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
var replicaList = List(brokerList(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
if (maxReplicaPerRack <= 0) {
for (j <- 0 until replicationFactor - 1)
replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
} else {
var rackReplicaCount: mutable.Map[Int, Int] = mutable.Map(filteredCluster.getBroker(brokerList(firstReplicaIndex)).get.rack -> 1)
var k = 0
for (j <- 0 until replicationFactor - 1) {
var done = false;
while (!done && k < brokerList.size) {
val broker = brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, k, brokerList.size))
val rack = filteredCluster.getBroker(broker).get.rack
if (!(rackReplicaCount contains rack)) {
replicaList ::= broker
rackReplicaCount += (rack -> 1)
done = true;
} else if (rackReplicaCount(rack) < maxReplicaPerRack) {
rackReplicaCount(rack) = rackReplicaCount(rack) + 1
replicaList ::= broker
done = true;
}
k = k + 1
}
if (!done) {
throw new AdminOperationException("not enough brokers available in unique racks to meet maxReplicaPerRack limit of " + maxReplicaPerRack)
}
}
}
ret.put(currentPartitionId, replicaList.reverse)
currentPartitionId = currentPartitionId + 1
}
Expand All @@ -89,18 +116,20 @@ object AdminUtils extends Logging {

def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0)
val existingMaxRackReplication = ZkUtils.getMaxRackReplicationForTopics(zkClient, List(topic))
if (existingPartitionsReplicaList.size == 0 || existingMaxRackReplication.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))

val existingReplicaList = existingPartitionsReplicaList.head._2
val maxReplicaPerRack = existingMaxRackReplication.head._2
val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
if (partitionsToAdd <= 0)
throw new AdminOperationException("The number of partitions for a topic can only be increased")

// create the new partition replication list
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "")
AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size, maxReplicaPerRack)
else
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)

Expand All @@ -114,7 +143,7 @@ object AdminUtils extends Logging {
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, maxReplicaPerRack = maxReplicaPerRack, update = true)
}

def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = {
Expand Down Expand Up @@ -151,16 +180,18 @@ object AdminUtils extends Logging {
topic: String,
partitions: Int,
replicationFactor: Int,
maxReplicaPerRack: Int = -1,
topicConfig: Properties = new Properties) {
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), partitions, replicationFactor, -1, -1, maxReplicaPerRack)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig, maxReplicaPerRack)
}

def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties,
maxReplicaPerRack: Int = -1,
update: Boolean = false) {
// validate arguments
Topic.validate(topic)
Expand All @@ -176,13 +207,13 @@ object AdminUtils extends Logging {
writeTopicConfig(zkClient, topic, config)

// create the partition assignment
writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, maxReplicaPerRack, update)
}

private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], maxReplicaPerRack: Int, update: Boolean) {
try {
val zkPath = ZkUtils.getTopicPath(topic)
val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)), maxReplicaPerRack)

if (!update) {
info("Topic creation " + jsonPartitionData.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object ReassignPartitionsCommand extends Logging {
var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
groupedByTopic.foreach { topicInfo =>
val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, ZkUtils.getFilteredCluster(zkClient, brokerListToReassign), topicInfo._2.size,
topicInfo._2.head._2.size)
partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
}
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,15 @@ object TopicCommand {
def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
val maxReplicaPerRack = if (opts.options.has(opts.maxRackReplicationOpt)) opts.options.valueOf(opts.maxRackReplicationOpt).intValue else -1
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs, maxReplicaPerRack)
} else {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
AdminUtils.createTopic(zkClient, topic, partitions, replicas, maxReplicaPerRack, configs)
}
println("Created topic \"%s\".".format(topic))
}
Expand All @@ -101,6 +102,9 @@ object TopicCommand {
AdminUtils.changeTopicConfig(zkClient, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
if(opts.options.has(opts.maxRackReplicationOpt)) {
Utils.croak("Changing the max-rack-replication is not supported.")
}
if(opts.options.has(opts.partitionsOpt)) {
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
"logic or ordering of the messages will be affected")
Expand Down Expand Up @@ -245,6 +249,10 @@ object TopicCommand {
.withRequiredArg
.describedAs("replication factor")
.ofType(classOf[java.lang.Integer])
val maxRackReplicationOpt = parser.accepts("max-rack-replication", "The maximum number of replicas assigned to a single rack for each partition in the topic being created.")
.withRequiredArg
.describedAs("max rack replication")
.ofType(classOf[java.lang.Integer])
val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
.withRequiredArg
.describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/client/ClientUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object ClientUtils extends Logging{
}

/**
* Parse a list of broker urls in the form host1:port1, host2:port2, ...
* Parse a list of broker urls in the form host1:port1:rack1, host2:port2:rack2, ...
*/
def parseBrokerList(brokerListStr: String): Seq[Broker] = {
val brokersStr = Utils.parseCsvList(brokerListStr)
Expand All @@ -100,7 +100,8 @@ object ClientUtils extends Logging{
val brokerInfos = brokerStr.split(":")
val hostName = brokerInfos(0)
val port = brokerInfos(1).toInt
new Broker(brokerId, hostName, port)
val rack = if (brokerInfos.size > 2) brokerInfos(2).toInt else 0
new Broker(brokerId, hostName, port, rack)
})
}

Expand Down
22 changes: 14 additions & 8 deletions core/src/main/scala/kafka/cluster/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ private[kafka] object Broker {
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
new Broker(id, host, port)
val rack = brokerInfo.get("rack") match {
case Some(r) => r.asInstanceOf[Int]
case None => 0
}
new Broker(id, host, port, rack)
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
}
Expand All @@ -50,32 +54,34 @@ private[kafka] object Broker {
val id = buffer.getInt
val host = readShortString(buffer)
val port = buffer.getInt
new Broker(id, host, port)
val rack = buffer.getInt
new Broker(id, host, port, rack)
}
}

private[kafka] case class Broker(val id: Int, val host: String, val port: Int) {
private[kafka] case class Broker(val id: Int, val host: String, val port: Int, val rack: Int) {

override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port + ",rack:" + rack)

def getConnectionString(): String = host + ":" + port
def getConnectionString(): String = host + ":" + port + ":" + rack

def writeTo(buffer: ByteBuffer) {
buffer.putInt(id)
writeShortString(buffer, host)
buffer.putInt(port)
buffer.putInt(rack)
}

def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ + 4 /* rack id*/

override def equals(obj: Any): Boolean = {
obj match {
case null => false
case n: Broker => id == n.id && host == n.host && port == n.port
case n: Broker => id == n.id && host == n.host && port == n.port && rack == n.rack
case _ => false
}
}

override def hashCode(): Int = hashcode(id, host, port)
override def hashCode(): Int = hashcode(id, host, port, rack)

}
20 changes: 13 additions & 7 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class ControllerContext(val zkClient: ZkClient,
var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
new mutable.HashMap,
var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
new mutable.HashSet) {
new mutable.HashSet,
var maxRackReplicaAssignment: mutable.Map[String, Int] = mutable.Map.empty) {

private var liveBrokersUnderlying: Set[Broker] = Set.empty
private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
Expand Down Expand Up @@ -463,8 +464,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
"reassigned not yet caught up with the leader")
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
val existingMaxReplicaAssignment = controllerContext.maxRackReplicaAssignment(topicAndPartition.topic)
//1. Update AR in ZK with OAR + RAR.
updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq, existingMaxReplicaAssignment)
//2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
newAndOldReplicas.toSeq)
Expand All @@ -488,7 +490,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
//9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
//10. Update AR in ZK with RAR.
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
val existingMaxReplicaAssignment = controllerContext.maxRackReplicaAssignment(topicAndPartition.topic)
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas, existingMaxReplicaAssignment)
//11. Update the /admin/reassign_partitions path in ZK to remove this partition.
removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
Expand Down Expand Up @@ -637,6 +640,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
controllerContext.maxRackReplicaAssignment = ZkUtils.getMaxRackReplicationForTopics(zkClient, controllerContext.allTopics.toSeq)
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
Expand Down Expand Up @@ -746,10 +750,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}

private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
replicas: Seq[Int]) {
replicas: Seq[Int],
maxRackReplication: Int) {
val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic))
partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas)
updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic)
updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic, maxRackReplication)
info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(",")))
// update the assigned replica list after a successful zookeeper write
controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas)
Expand Down Expand Up @@ -811,10 +816,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}

def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]],
maxRackReplication: Int) {
try {
val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic)
val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)), maxRackReplication)
ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
Expand Down
Loading