diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 9966660cf668f..4db204351f53b 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -28,6 +28,7 @@ import kafka.api.{TopicMetadata, PartitionMetadata} import java.util.Random import java.util.Properties import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.security.auth.KafkaPrincipal import scala.Predef._ import scala.collection._ @@ -231,6 +232,7 @@ object AdminUtils extends Logging { partitions: Int, replicationFactor: Int, topicConfig: Properties = new Properties) { + val brokerList = ZkUtils.getSortedBrokerList(zkClient) val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig) @@ -246,7 +248,7 @@ object AdminUtils extends Logging { require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.") val topicPath = ZkUtils.getTopicPath(topic) - + if (!update) { if (zkClient.exists(topicPath)) throw new TopicExistsException("Topic \"%s\" already exists.".format(topic)) @@ -257,6 +259,11 @@ object AdminUtils extends Logging { throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", "))) } } + + //by default we make user that issues topic creation as the owner. + if (!config.containsKey(LogConfig.OwnersProp)) { + config.put(LogConfig.OwnersProp, new KafkaPrincipal(KafkaPrincipal.USER_TYPE, System.getProperty("user.name")).toString) + } } partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment)) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 7fc7e33bc770d..22c02b1e5070a 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -19,9 +19,10 @@ package kafka.log import java.util.Properties import kafka.server.KafkaConfig +import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Utils import scala.collection._ -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} +import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef} import kafka.message.BrokerCompressionCodec import kafka.message.Message @@ -44,6 +45,7 @@ object Defaults { val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas val CompressionType = kafka.server.Defaults.CompressionType val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable + val Owners = KafkaPrincipal.ANONYMOUS.toString } case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { @@ -69,6 +71,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp) val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase val preallocate = getBoolean(LogConfig.PreAllocateEnableProp) + val owners = getString(LogConfig.OwnersProp) def randomSegmentJitter: Long = if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) @@ -101,6 +104,7 @@ object LogConfig { val MinInSyncReplicasProp = "min.insync.replicas" val CompressionTypeProp = "compression.type" val PreAllocateEnableProp = "preallocate" + val OwnersProp = "owners" val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log" val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled" @@ -125,13 +129,14 @@ object LogConfig { "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " + "no compression; and 'producer' which means retain the original compression codec set by the producer." val PreAllocateEnableDoc ="Should pre allocate file when create new segment?" + val OwnersDoc="Comma separated list of owners of this log. The owners need to be specified as string representation of " + + "KafkaPrincipal which is principalType:name e.g. User:alice,User:bob" private val configDef = { import ConfigDef.Range._ import ConfigDef.ValidString._ import ConfigDef.Type._ import ConfigDef.Importance._ - import java.util.Arrays.asList new ConfigDef() .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinHeaderSize), MEDIUM, SegmentSizeDoc) @@ -158,6 +163,7 @@ object LogConfig { .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc) .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, MEDIUM, PreAllocateEnableDoc) + .define(OwnersProp, STRING, Defaults.Owners, LOW, OwnersDoc) } def apply(): LogConfig = LogConfig(new Properties()) @@ -188,11 +194,21 @@ object LogConfig { require(names.contains(name), "Unknown configuration \"%s\".".format(name)) } + def validateOwners(props: Properties): Unit = { + if(props.containsKey(OwnersProp)) { + try { + props.getProperty(OwnersProp).split(",").map(owner => KafkaPrincipal.fromString(owner.trim)) + } catch { + case e: IllegalArgumentException => throw new ConfigException("owners must be a comma separated list of kafka principal strings e.g. User:alice,User:bob") + } + } + } /** * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ def validate(props: Properties) { validateNames(props) + validateOwners(props) configDef.parse(props) } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 9bd8171f484c1..4abb8cf3cd544 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -17,6 +17,7 @@ package kafka.admin import junit.framework.Assert._ +import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.Test import java.util.Properties import kafka.utils._ @@ -120,6 +121,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging { TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + val config: Properties = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic) + assertEquals(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, System.getProperty("user.name")).toString, config.getProperty(LogConfig.OwnersProp)) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 51cd62c30ccc4..05ad8db519f64 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -55,7 +55,8 @@ class LogConfigTest { def testFromPropsInvalid() { LogConfig.configNames().foreach((name) => { name match { - case LogConfig.UncleanLeaderElectionEnableProp => return + case LogConfig.UncleanLeaderElectionEnableProp => + case LogConfig.OwnersProp => case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number") case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" ) case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar"); @@ -66,6 +67,13 @@ class LogConfigTest { }) } + @Test(expected = classOf[ConfigException]) + def testInvalidOwner(): Unit = { + val p = new Properties() + p.setProperty(LogConfig.OwnersProp, "invalid") + LogConfig.validate(p) + } + private def assertPropertyInvalid(name: String, values: AnyRef*) { values.foreach((value) => { val props = new Properties