Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import kafka.api.{TopicMetadata, PartitionMetadata}
import java.util.Random
import java.util.Properties
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.security.auth.KafkaPrincipal

import scala.Predef._
import scala.collection._
Expand Down Expand Up @@ -231,6 +232,7 @@ object AdminUtils extends Logging {
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties) {

val brokerList = ZkUtils.getSortedBrokerList(zkClient)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
Expand All @@ -246,7 +248,7 @@ object AdminUtils extends Logging {
require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")

val topicPath = ZkUtils.getTopicPath(topic)

if (!update) {
if (zkClient.exists(topicPath))
throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))
Expand All @@ -257,6 +259,11 @@ object AdminUtils extends Logging {
throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", ")))
}
}

//by default we make user that issues topic creation as the owner.
if (!config.containsKey(LogConfig.OwnersProp)) {
config.put(LogConfig.OwnersProp, new KafkaPrincipal(KafkaPrincipal.USER_TYPE, System.getProperty("user.name")).toString)
}
}

partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment))
Expand Down
20 changes: 18 additions & 2 deletions core/src/main/scala/kafka/log/LogConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package kafka.log

import java.util.Properties
import kafka.server.KafkaConfig
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Utils
import scala.collection._
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef}
import kafka.message.BrokerCompressionCodec
import kafka.message.Message

Expand All @@ -44,6 +45,7 @@ object Defaults {
val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
val CompressionType = kafka.server.Defaults.CompressionType
val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable
val Owners = KafkaPrincipal.ANONYMOUS.toString
}

case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) {
Expand All @@ -69,6 +71,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi
val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp)
val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase
val preallocate = getBoolean(LogConfig.PreAllocateEnableProp)
val owners = getString(LogConfig.OwnersProp)

def randomSegmentJitter: Long =
if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
Expand Down Expand Up @@ -101,6 +104,7 @@ object LogConfig {
val MinInSyncReplicasProp = "min.insync.replicas"
val CompressionTypeProp = "compression.type"
val PreAllocateEnableProp = "preallocate"
val OwnersProp = "owners"

val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled"
Expand All @@ -125,13 +129,14 @@ object LogConfig {
"standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " +
"no compression; and 'producer' which means retain the original compression codec set by the producer."
val PreAllocateEnableDoc ="Should pre allocate file when create new segment?"
val OwnersDoc="Comma separated list of owners of this log. The owners need to be specified as string representation of " +
"KafkaPrincipal which is principalType:name e.g. User:alice,User:bob"

private val configDef = {
import ConfigDef.Range._
import ConfigDef.ValidString._
import ConfigDef.Type._
import ConfigDef.Importance._
import java.util.Arrays.asList

new ConfigDef()
.define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinHeaderSize), MEDIUM, SegmentSizeDoc)
Expand All @@ -158,6 +163,7 @@ object LogConfig {
.define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc)
.define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable,
MEDIUM, PreAllocateEnableDoc)
.define(OwnersProp, STRING, Defaults.Owners, LOW, OwnersDoc)
}

def apply(): LogConfig = LogConfig(new Properties())
Expand Down Expand Up @@ -188,11 +194,21 @@ object LogConfig {
require(names.contains(name), "Unknown configuration \"%s\".".format(name))
}

def validateOwners(props: Properties): Unit = {
if(props.containsKey(OwnersProp)) {
try {
props.getProperty(OwnersProp).split(",").map(owner => KafkaPrincipal.fromString(owner.trim))
} catch {
case e: IllegalArgumentException => throw new ConfigException("owners must be a comma separated list of kafka principal strings e.g. User:alice,User:bob")
}
}
}
/**
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
*/
def validate(props: Properties) {
validateNames(props)
validateOwners(props)
configDef.parse(props)
}

Expand Down
3 changes: 3 additions & 0 deletions core/src/test/scala/unit/kafka/admin/AdminTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kafka.admin

import junit.framework.Assert._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.junit.Test
import java.util.Properties
import kafka.utils._
Expand Down Expand Up @@ -120,6 +121,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
val config: Properties = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
assertEquals(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, System.getProperty("user.name")).toString, config.getProperty(LogConfig.OwnersProp))
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap
Expand Down
10 changes: 9 additions & 1 deletion core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class LogConfigTest {
def testFromPropsInvalid() {
LogConfig.configNames().foreach((name) => {
name match {
case LogConfig.UncleanLeaderElectionEnableProp => return
case LogConfig.UncleanLeaderElectionEnableProp =>
case LogConfig.OwnersProp =>
case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number")
case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" )
case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar");
Expand All @@ -66,6 +67,13 @@ class LogConfigTest {
})
}

@Test(expected = classOf[ConfigException])
def testInvalidOwner(): Unit = {
val p = new Properties()
p.setProperty(LogConfig.OwnersProp, "invalid")
LogConfig.validate(p)
}

private def assertPropertyInvalid(name: String, values: AnyRef*) {
values.foreach((value) => {
val props = new Properties
Expand Down