From bcd82e7ece4c188c64c18c41e1597cbc20ef80bd Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 14 Jul 2015 17:54:40 -0700 Subject: [PATCH 1/4] KAFKA-2145: Add a log config so users can define topic owners. --- core/src/main/scala/kafka/admin/AdminUtils.scala | 8 +++++++- core/src/main/scala/kafka/log/LogConfig.scala | 5 +++++ core/src/main/scala/kafka/server/KafkaConfig.scala | 1 + core/src/test/scala/unit/kafka/admin/AdminTest.scala | 2 ++ core/src/test/scala/unit/kafka/log/LogConfigTest.scala | 3 ++- 5 files changed, 17 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 9966660cf668f..6b4c0aa124e2d 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -231,6 +231,7 @@ object AdminUtils extends Logging { partitions: Int, replicationFactor: Int, topicConfig: Properties = new Properties) { + val brokerList = ZkUtils.getSortedBrokerList(zkClient) val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig) @@ -246,7 +247,7 @@ object AdminUtils extends Logging { require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.") val topicPath = ZkUtils.getTopicPath(topic) - + if (!update) { if (zkClient.exists(topicPath)) throw new TopicExistsException("Topic \"%s\" already exists.".format(topic)) @@ -257,6 +258,11 @@ object AdminUtils extends Logging { throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", "))) } } + + //by default we make user that issues topic creation as the owner. + if(!config.containsKey(LogConfig.OwnersProp)) { + config.put(LogConfig.OwnersProp, System.getProperty("user.name")) + } } partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment)) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 7fc7e33bc770d..8456459727215 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -44,6 +44,7 @@ object Defaults { val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas val CompressionType = kafka.server.Defaults.CompressionType val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable + val Owners = kafka.server.Defaults.Owners } case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { @@ -69,6 +70,7 @@ case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfi val minInSyncReplicas = getInt(LogConfig.MinInSyncReplicasProp) val compressionType = getString(LogConfig.CompressionTypeProp).toLowerCase val preallocate = getBoolean(LogConfig.PreAllocateEnableProp) + val owners = getString(LogConfig.OwnersProp) def randomSegmentJitter: Long = if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) @@ -101,6 +103,7 @@ object LogConfig { val MinInSyncReplicasProp = "min.insync.replicas" val CompressionTypeProp = "compression.type" val PreAllocateEnableProp = "preallocate" + val OwnersProp = "owners" val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log" val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is rolled" @@ -125,6 +128,7 @@ object LogConfig { "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " + "no compression; and 'producer' which means retain the original compression codec set by the producer." val PreAllocateEnableDoc ="Should pre allocate file when create new segment?" + val OwnersDoc="Comma separated list of owners of this log." private val configDef = { import ConfigDef.Range._ @@ -158,6 +162,7 @@ object LogConfig { .define(CompressionTypeProp, STRING, Defaults.CompressionType, in(BrokerCompressionCodec.brokerCompressionOptions:_*), MEDIUM, CompressionTypeDoc) .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, MEDIUM, PreAllocateEnableDoc) + .define(OwnersProp, STRING, Defaults.Owners, LOW, OwnersDoc) } def apply(): LogConfig = LogConfig(new Properties()) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d547a01cf7098..cfbe3b352fb37 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -94,6 +94,7 @@ object Defaults { val NumRecoveryThreadsPerDataDir = 1 val AutoCreateTopicsEnable = true val MinInSyncReplicas = 1 + val Owners = "Dr. Who?" /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMs = 30000 diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 9bd8171f484c1..8ceefe222fcdc 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -120,6 +120,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging { TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) + val config: Properties = AdminUtils.fetchTopicConfig(zkClient, topic) + assertEquals(System.getProperty("user.name"), config.getProperty(LogConfig.OwnersProp)) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 51cd62c30ccc4..323e7796129fa 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -55,7 +55,8 @@ class LogConfigTest { def testFromPropsInvalid() { LogConfig.configNames().foreach((name) => { name match { - case LogConfig.UncleanLeaderElectionEnableProp => return + case LogConfig.UncleanLeaderElectionEnableProp => + case LogConfig.OwnersProp => case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number") case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" ) case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar"); From 5c5584e41936fe32fe8cf7b7993f29a138e55e3c Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 25 Aug 2015 15:45:33 -0700 Subject: [PATCH 2/4] Fixing some styling issues. --- core/src/main/scala/kafka/admin/AdminUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 6b4c0aa124e2d..c2d5498ee6179 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -260,8 +260,8 @@ object AdminUtils extends Logging { } //by default we make user that issues topic creation as the owner. - if(!config.containsKey(LogConfig.OwnersProp)) { - config.put(LogConfig.OwnersProp, System.getProperty("user.name")) + if (!config.containsKey(LogConfig.OwnersProp)) { + config.put(LogConfig.OwnersProp, System.getProperty("user.name")) } } From a5914b3147477c4f2e88ef90e3abd1466318993a Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 25 Aug 2015 16:02:07 -0700 Subject: [PATCH 3/4] Fixing a failing unit test as a method used by it was deleted in trunk. --- core/src/test/scala/unit/kafka/admin/AdminTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 8ceefe222fcdc..6811ce9f87b1d 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -120,7 +120,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging { TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) - val config: Properties = AdminUtils.fetchTopicConfig(zkClient, topic) + val config: Properties = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic) assertEquals(System.getProperty("user.name"), config.getProperty(LogConfig.OwnersProp)) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) From ad2ac06e3ee0775df5f940e49405d7d7776209a0 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 15 Sep 2015 17:24:05 -0700 Subject: [PATCH 4/4] Moving to KafkaPrincipal. --- .../main/scala/kafka/admin/AdminUtils.scala | 3 ++- core/src/main/scala/kafka/log/LogConfig.scala | 19 +++++++++++++++---- .../main/scala/kafka/server/KafkaConfig.scala | 1 - .../scala/unit/kafka/admin/AdminTest.scala | 3 ++- .../scala/unit/kafka/log/LogConfigTest.scala | 7 +++++++ 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index c2d5498ee6179..4db204351f53b 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -28,6 +28,7 @@ import kafka.api.{TopicMetadata, PartitionMetadata} import java.util.Random import java.util.Properties import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.security.auth.KafkaPrincipal import scala.Predef._ import scala.collection._ @@ -261,7 +262,7 @@ object AdminUtils extends Logging { //by default we make user that issues topic creation as the owner. if (!config.containsKey(LogConfig.OwnersProp)) { - config.put(LogConfig.OwnersProp, System.getProperty("user.name")) + config.put(LogConfig.OwnersProp, new KafkaPrincipal(KafkaPrincipal.USER_TYPE, System.getProperty("user.name")).toString) } } diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 8456459727215..22c02b1e5070a 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -19,9 +19,10 @@ package kafka.log import java.util.Properties import kafka.server.KafkaConfig +import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.Utils import scala.collection._ -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} +import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef} import kafka.message.BrokerCompressionCodec import kafka.message.Message @@ -44,7 +45,7 @@ object Defaults { val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas val CompressionType = kafka.server.Defaults.CompressionType val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable - val Owners = kafka.server.Defaults.Owners + val Owners = KafkaPrincipal.ANONYMOUS.toString } case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) { @@ -128,14 +129,14 @@ object LogConfig { "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " + "no compression; and 'producer' which means retain the original compression codec set by the producer." val PreAllocateEnableDoc ="Should pre allocate file when create new segment?" - val OwnersDoc="Comma separated list of owners of this log." + val OwnersDoc="Comma separated list of owners of this log. The owners need to be specified as string representation of " + + "KafkaPrincipal which is principalType:name e.g. User:alice,User:bob" private val configDef = { import ConfigDef.Range._ import ConfigDef.ValidString._ import ConfigDef.Type._ import ConfigDef.Importance._ - import java.util.Arrays.asList new ConfigDef() .define(SegmentBytesProp, INT, Defaults.SegmentSize, atLeast(Message.MinHeaderSize), MEDIUM, SegmentSizeDoc) @@ -193,11 +194,21 @@ object LogConfig { require(names.contains(name), "Unknown configuration \"%s\".".format(name)) } + def validateOwners(props: Properties): Unit = { + if(props.containsKey(OwnersProp)) { + try { + props.getProperty(OwnersProp).split(",").map(owner => KafkaPrincipal.fromString(owner.trim)) + } catch { + case e: IllegalArgumentException => throw new ConfigException("owners must be a comma separated list of kafka principal strings e.g. User:alice,User:bob") + } + } + } /** * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ def validate(props: Properties) { validateNames(props) + validateOwners(props) configDef.parse(props) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 961ddea25e273..1e8b2331486ff 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -97,7 +97,6 @@ object Defaults { val NumRecoveryThreadsPerDataDir = 1 val AutoCreateTopicsEnable = true val MinInSyncReplicas = 1 - val Owners = "Dr. Who?" /** ********* Replication configuration ***********/ val ControllerSocketTimeoutMs = 30000 diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 6811ce9f87b1d..4abb8cf3cd544 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -17,6 +17,7 @@ package kafka.admin import junit.framework.Assert._ +import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.Test import java.util.Properties import kafka.utils._ @@ -121,7 +122,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging { // create the topic AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment) val config: Properties = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic) - assertEquals(System.getProperty("user.name"), config.getProperty(LogConfig.OwnersProp)) + assertEquals(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, System.getProperty("user.name")).toString, config.getProperty(LogConfig.OwnersProp)) // create leaders for all partitions TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 323e7796129fa..05ad8db519f64 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -67,6 +67,13 @@ class LogConfigTest { }) } + @Test(expected = classOf[ConfigException]) + def testInvalidOwner(): Unit = { + val p = new Properties() + p.setProperty(LogConfig.OwnersProp, "invalid") + LogConfig.validate(p) + } + private def assertPropertyInvalid(name: String, values: AnyRef*) { values.foreach((value) => { val props = new Properties