From cb242d459f5339bb2ac1aca2e9a8063888364b41 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Fri, 9 Mar 2018 15:26:42 +0000 Subject: [PATCH 01/17] KAFKA-6726: KIP-277 - Fine Grained ACL for CreateTopics API * Handling CreateTopicsRequest now requires Create auth on Topic resource and does not require Create on Cluster * AclCommand --producer option adjusted * Existing Unit and Integration tests adjusted accordingly https://issues.apache.org/jira/browse/KAFKA-6726 https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API Co-authored-by: Edoardo Comar Co-authored-by: Mickael Maison - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) --- .../common/requests/CreateTopicsResponse.java | 3 +- .../main/scala/kafka/admin/AclCommand.scala | 21 +-- .../main/scala/kafka/server/KafkaApis.scala | 29 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 35 ++++- .../kafka/api/EndToEndAuthorizationTest.scala | 133 ++++++++++++------ .../api/SaslEndToEndAuthorizationTest.scala | 2 +- .../unit/kafka/admin/AclCommandTest.scala | 10 +- docs/security.html | 2 +- 8 files changed, 160 insertions(+), 75 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java index b1504b1acdffa..54d33a526a7ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java @@ -73,7 +73,7 @@ public static Schema[] schemaVersions() { * * REQUEST_TIMED_OUT(7) * INVALID_TOPIC_EXCEPTION(17) - * CLUSTER_AUTHORIZATION_FAILED(31) + * TOPIC_AUTHORIZATION_FAILED(29) * TOPIC_ALREADY_EXISTS(36) * INVALID_PARTITIONS(37) * INVALID_REPLICATION_FACTOR(38) @@ -81,6 +81,7 @@ public static Schema[] schemaVersions() { * INVALID_CONFIG(40) * NOT_CONTROLLER(41) * INVALID_REQUEST(42) + * POLICY_VIOLATION(44) */ private final Map errors; diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 6dd227223e026..40ac082e84337 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -31,7 +31,7 @@ object AclCommand extends Logging { val Newline = scala.util.Properties.lineSeparator val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] ( - Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All), + Topic -> Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs, All), Group -> Set(Read, Describe, Delete, All), Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All), TransactionalId -> Set(Describe, Write, All), @@ -153,13 +153,16 @@ object AclCommand extends Logging { val transactionalIds: Set[Resource] = getResource(opts).filter(_.resourceType == TransactionalId) val enableIdempotence = opts.options.has(opts.idempotentOpt) - val acls = getAcl(opts, Set(Write, Describe)) + val topicAcls = getAcl(opts, Set(Write, Describe, Create)) + val transactionalIdAcls = getAcl(opts, Set(Write, Describe)) - //Write, Describe permission on topics, Create permission on cluster, Write, Describe on transactionalIds - topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++ - transactionalIds.map(_ -> acls).toMap[Resource, Set[Acl]] + - (Resource.ClusterResource -> (getAcl(opts, Set(Create)) ++ - (if (enableIdempotence) getAcl(opts, Set(IdempotentWrite)) else Set.empty[Acl]))) + //Write, Describe, Create permission on topics, Write, Describe on transactionalIds + topics.map(_ -> topicAcls).toMap[Resource, Set[Acl]] ++ + transactionalIds.map(_ -> transactionalIdAcls).toMap[Resource, Set[Acl]] ++ + (if (enableIdempotence) + Map(Resource.ClusterResource -> getAcl(opts, Set(IdempotentWrite))) + else + Map.empty) } private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { @@ -168,7 +171,7 @@ object AclCommand extends Logging { val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic) val groups: Set[Resource] = resources.filter(_.resourceType == Group) - //Read,Describe on topic, Read on consumerGroup + Create on cluster + //Read, Describe on topic, Read on consumerGroup val acls = getAcl(opts, Set(Read, Describe)) @@ -355,7 +358,7 @@ object AclCommand extends Logging { .ofType(classOf[String]) val producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " + - "This will generate ACLs that allows WRITE,DESCRIBE on topic and CREATE on cluster. ") + "This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.") val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " + "This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.") diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9f1ab62f03dc3..4fd0c2b4d62c2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -62,6 +62,7 @@ import scala.collection.JavaConverters._ import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} +import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails /** * Logic to handle the various Kafka requests @@ -1034,19 +1035,20 @@ class KafkaApis(val requestChannel: RequestChannel, var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic))) - var unauthorizedForCreateTopics = Set[String]() + var authorizedForDescribeNotCreateTopics = Set[String]() if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { if (!authorize(request.session, Create, Resource.ClusterResource)) { - authorizedTopics --= nonExistingTopics - unauthorizedForCreateTopics ++= nonExistingTopics + val unauthorizedForCreateTopics = authorizedTopics.filter(topic => !authorize(request.session, Create, new Resource(Topic, topic))) + authorizedTopics --= unauthorizedForCreateTopics + authorizedForDescribeNotCreateTopics ++= unauthorizedForCreateTopics } } } - val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic => + val unauthorizedForCreateTopicMetadata = authorizedForDescribeNotCreateTopics.map(topic => new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, isInternal(topic), java.util.Collections.emptyList())) @@ -1424,16 +1426,20 @@ class KafkaApis(val requestChannel: RequestChannel, (topic, new ApiError(Errors.NOT_CONTROLLER, null)) } sendResponseCallback(results) - } else if (!authorize(request.session, Create, Resource.ClusterResource)) { - val results = createTopicsRequest.topics.asScala.map { case (topic, _) => - (topic, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)) - } - sendResponseCallback(results) } else { val (validTopics, duplicateTopics) = createTopicsRequest.topics.asScala.partition { case (topic, _) => !createTopicsRequest.duplicateTopics.contains(topic) } + val (authorizedTopics, unauthorizedTopics) = + if (authorize(request.session, Create, Resource.ClusterResource)) { + (validTopics, Map[String, TopicDetails]()) + } else { + validTopics.partition { case (topic, _) => + authorize(request.session, Create, new Resource(Topic, topic)) + } + } + // Special handling to add duplicate topics to the response def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): Unit = { @@ -1447,14 +1453,15 @@ class KafkaApis(val requestChannel: RequestChannel, duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap } else Map.empty - val completeResults = results ++ duplicatedTopicsResults + val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) + val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults sendResponseCallback(completeResults) } adminManager.createTopics( createTopicsRequest.timeout, createTopicsRequest.validateOnly, - validTopics, + authorizedTopics, sendResponseWithDuplicatesCallback ) } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4af9b83f3b840..b81b039f0467e 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -73,6 +73,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val groupResource = new Resource(Group, group) val deleteTopicResource = new Resource(Topic, deleteTopic) val transactionalIdResource = new Resource(TransactionalId, transactionalId) + val createTopicResource = new Resource(Topic, createTopic) val groupReadAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) val groupDescribeAcl = Map(groupResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) @@ -82,6 +83,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter))) val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite))) + val topicCreateAcl = Map(createTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create))) val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) val topicWriteAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write))) val topicDescribeAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) @@ -207,7 +209,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEADER_AND_ISR -> clusterAcl, ApiKeys.STOP_REPLICA -> clusterAcl, ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl, - ApiKeys.CREATE_TOPICS -> clusterCreateAcl, + ApiKeys.CREATE_TOPICS -> topicCreateAcl, ApiKeys.DELETE_TOPICS -> topicDeleteAcl, ApiKeys.DELETE_RECORDS -> topicDeleteAcl, ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl, @@ -492,6 +494,37 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + @Test + def testCreateTopicAuthorizationWithClusterCreate() { + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.CREATE_TOPICS -> createTopicsRequest + ) + + val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( + ApiKeys.CREATE_TOPICS -> clusterCreateAcl, + ) + + for ((key, request) <- requestKeyToRequest) { + removeAllAcls() + val resources = Set[ResourceType](Topic) + + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false) + + val resourceToAcls = requestKeysToAcls(key) + resourceToAcls.get(topicResource).foreach { acls => + val describeAcls = topicDescribeAcl(topicResource) + val isAuthorized = describeAcls == acls + addAndVerifyAcls(describeAcls, topicResource) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized) + removeAllAcls() + } + + for ((resource, acls) <- resourceToAcls) + addAndVerifyAcls(acls, resource) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true) + } + } + @Test def testFetchFollowerRequest() { val key = ApiKeys.FETCH diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index a2e5fd9d2b3f9..a1608cddcc32a 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -60,7 +60,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas override val serverCount = 3 override def configureSecurityBeforeServersStart() { - AclCommand.main(clusterAclArgs) + AclCommand.main(clusterBrokerAclArgs) AclCommand.main(topicBrokerReadAclArgs) } @@ -82,23 +82,20 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas val wildcardTopicResource = new Resource(Topic, wildcard) val wildcardGroupResource = new Resource(Group, wildcard) - // Arguments to AclCommand to set ACLs. There are three definitions here: - // 1- Provides read and write access to topic - // 2- Provides only write access to topic - // 3- Provides read access to consumer group - def clusterAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=ClusterAction", - s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") + // Arguments to AclCommand to set ACLs. + def clusterBrokerAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--cluster", + s"--operation=ClusterAction", + s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--operation=Read", - s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") - def produceAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$wildcard", + s"--operation=Read", + s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") + def produceAclArgs(topic:String): Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", s"--topic=$topic", @@ -124,13 +121,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas s"--topic=$topic", s"--operation=Write", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") - def consumeAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--group=$group", - s"--consumer", - s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$topic", + s"--group=$group", + s"--consumer", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def groupAclArgs: Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", @@ -138,20 +135,34 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas s"--operation=Read", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$wildcard", - s"--group=$wildcard", - s"--consumer", - s"--producer", - s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") - - def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction)) + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$wildcard", + s"--group=$wildcard", + s"--consumer", + s"--producer", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def clusterCreateAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--cluster", + s"--operation=Create", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def topicWriteAclArgs(topic:String): Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$topic", + s"--operation=Write", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + + def ClusterBrokerActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction)) def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read)) def GroupReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Write)) def TopicDescribeAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Describe)) + def TopicCreateAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Create)) + def ClusterCreateAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Create)) // The next two configuration parameters enable ZooKeeper secure ACLs // and sets the Kafka authorizer, both necessary to enable security. this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") @@ -160,6 +171,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3") + this.serverConfig.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group") /** @@ -169,7 +181,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas override def setUp() { super.setUp() servers.foreach { s => - TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource) + TestUtils.waitAndVerifyAcls(ClusterBrokerActionAcl, s.apis.authorizer.get, Resource.ClusterResource) TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*")) } // create the test topic with all the brokers as replicas @@ -200,14 +212,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @Test def testProduceConsumeViaAssign(): Unit = { - setAclsAndProduce() + setAclsAndProduce(tp) consumers.head.assign(List(tp).asJava) consumeRecords(this.consumers.head, numRecords) } @Test def testProduceConsumeViaSubscribe(): Unit = { - setAclsAndProduce() + setAclsAndProduce(tp) consumers.head.subscribe(List(topic).asJava) consumeRecords(this.consumers.head, numRecords) } @@ -228,11 +240,40 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } } - protected def setAclsAndProduce() { - AclCommand.main(produceAclArgs) - AclCommand.main(consumeAclArgs) + def testProduceConsumeTopicAutoCreateTopicCreateAcl(): Unit = { + // topic2 is not created on setup() + val tp2 = new TopicPartition("topic2", 0) + setAclsAndProduce(tp2) + consumers.head.assign(List(tp2).asJava) + consumeRecords(this.consumers.head, numRecords, topic = tp2.topic) + } + + @Test + def testProduceConsumeTopicAutoCreateClusterCreateAcl(): Unit = { + // topic2 is not created on setup() + val tp2 = new TopicPartition("topic2", 0) + setClusterCreateAclsAndProduce(tp2) + consumers.head.assign(List(tp2).asJava) + consumeRecords(this.consumers.head, numRecords, topic = tp2.topic) + } + + protected def setClusterCreateAclsAndProduce(tp: TopicPartition) { + AclCommand.main(clusterCreateAclArgs) + AclCommand.main(topicWriteAclArgs(tp.topic)) + AclCommand.main(consumeAclArgs(tp.topic)) + servers.foreach { s => + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, new Resource(Topic, tp.topic)) + TestUtils.waitAndVerifyAcls(ClusterBrokerActionAcl ++ ClusterCreateAcl, s.apis.authorizer.get, clusterResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) + } + sendRecords(numRecords, tp) + } + + protected def setAclsAndProduce(tp: TopicPartition) { + AclCommand.main(produceAclArgs(tp.topic)) + AclCommand.main(consumeAclArgs(tp.topic)) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, new Resource(Topic, tp.topic)) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } sendRecords(numRecords, tp) @@ -283,10 +324,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def noConsumeWithoutDescribeAclSetup(): Unit = { - AclCommand.main(produceAclArgs) + AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(groupAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } @@ -328,10 +369,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } private def noConsumeWithDescribeAclSetup(): Unit = { - AclCommand.main(produceAclArgs) + AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(groupAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) } sendRecords(numRecords, tp) @@ -343,9 +384,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @Test def testNoGroupAcl(): Unit = { - AclCommand.main(produceAclArgs) + AclCommand.main(produceAclArgs(tp.topic)) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) } sendRecords(numRecords, tp) consumers.head.assign(List(tp).asJava) diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index a5bf33171a4a0..643cd4ce6afbb 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -56,7 +56,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { */ @Test(timeout = 15000) def testTwoConsumersWithDifferentSaslCredentials(): Unit = { - setAclsAndProduce() + setAclsAndProduce(tp) val consumer1 = consumers.head val consumer2Config = new Properties diff --git a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala index 9b6272860ed7a..9197f79882f08 100644 --- a/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AclCommandTest.scala @@ -49,8 +49,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { ) private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])]( - TopicResources -> (Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs), - Array("--operation", "Read" , "--operation", "Write", "--operation", "Describe", "--operation", "Delete", + TopicResources -> (Set(Read, Write, Create, Describe, Delete, DescribeConfigs, AlterConfigs), + Array("--operation", "Read" , "--operation", "Write", "--operation", "Create", "--operation", "Describe", "--operation", "Delete", "--operation", "DescribeConfigs", "--operation", "AlterConfigs")), Set(Resource.ClusterResource) -> (Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite), Array("--operation", "Create", "--operation", "ClusterAction", "--operation", "DescribeConfigs", @@ -61,10 +61,10 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging { ) private def ProducerResourceToAcls(enableIdempotence: Boolean = false) = Map[Set[Resource], Set[Acl]]( - TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts), + TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe, Create), Hosts), TransactionalIdResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts), - Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, Set(Some(Create), - if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts) + Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, + Set(if (enableIdempotence) Some(IdempotentWrite) else None).flatten, Hosts) ) private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]]( diff --git a/docs/security.html b/docs/security.html index 4fcbdad72a5a9..c7b06fc4be7ea 100644 --- a/docs/security.html +++ b/docs/security.html @@ -1115,7 +1115,7 @@

Command Line Interface --producer Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, - DESCRIBE on topic and CREATE on cluster. + DESCRIBE and CREATE on topic. Convenience From 1a1584ce525647ef025938d034d1e7b172cb859f Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Fri, 25 May 2018 14:51:12 +0100 Subject: [PATCH 02/17] upgrade notes --- docs/upgrade.html | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/upgrade.html b/docs/upgrade.html index 056fb8366e44f..dc858af3a14c7 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -97,6 +97,7 @@
Notable changes in 2 will be removed in a future version.
  • The internal method kafka.admin.AdminClient.deleteRecordsBefore has been removed. Users are encouraged to migrate to org.apache.kafka.clients.admin.AdminClient.deleteRecords.
  • The tool kafka.tools.ReplayLogProducer has been removed.
  • +
  • The AclCommand tool --producer convenience option uses the KIP-277 finer grained ACL on a given topic.
  • New Protocol Versions
    From 099c53973dd2c1b769970efcaf6d183cf0c1e0da Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Fri, 25 May 2018 15:43:37 +0100 Subject: [PATCH 03/17] minor improvement to test with auto-create option --- .../integration/kafka/api/AuthorizerIntegrationTest.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index b81b039f0467e..889251655eaa7 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -595,6 +595,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } + // using cluster-level create Acl in this test, leaving topic-level create Acl in the matching `read` test addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) sendRecords(numRecords, topicPartition) } @@ -839,7 +840,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val newTopicResource = new Resource(Topic, newTopic) addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource) addAndVerifyAcls(groupReadAcl(groupResource), groupResource) - addAndVerifyAcls(clusterAcl(Resource.ClusterResource), Resource.ClusterResource) try { this.consumers.head.assign(List(topicPartition).asJava) consumeRecords(this.consumers.head) @@ -849,8 +849,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource) - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + // using topic-level create Acl in this test, leaving cluster-level create Acl in the matching `write` test + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write), new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), + newTopicResource) sendRecords(numRecords, topicPartition) consumeRecords(this.consumers.head, topic = newTopic, part = 0) From b2e67b11354a53667b86b00ea76ae9587e6a9edf Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 29 May 2018 14:16:02 +0100 Subject: [PATCH 04/17] KAFKA-6726: KIP-277 - Fine Grained ACL for CreateTopics API * Handling CreateTopicsRequest now requires Create auth on Topic resource and does not require Create on Cluster * AclCommand --producer option adjusted * Existing Unit and Integration tests adjusted accordingly * upgrade notes https://issues.apache.org/jira/browse/KAFKA-6726 https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API Co-authored-by: Edoardo Comar Co-authored-by: Mickael Maison - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) --- .../scala/integration/kafka/api/EndToEndAuthorizationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index a1608cddcc32a..ad5978ba90131 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -235,7 +235,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas private def setWildcardResourceAcls() { AclCommand.main(produceConsumeWildcardAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource) } } From ea58fc3b1f927d05b25b0f3e91e77bb7b19c7b63 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Mon, 4 Jun 2018 18:18:33 +0100 Subject: [PATCH 05/17] update following review check all paths on topic auto creation simplified testCreateTopicAuthorizationWithClusterCreate --- .../kafka/api/AuthorizerIntegrationTest.scala | 93 +++++++++++-------- 1 file changed, 55 insertions(+), 38 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 889251655eaa7..de4aa0c7c598c 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -17,6 +17,7 @@ import java.util import java.util.concurrent.ExecutionException import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} +import java.time.Duration import kafka.admin.AdminClient import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService} @@ -496,33 +497,22 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testCreateTopicAuthorizationWithClusterCreate() { - val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( - ApiKeys.CREATE_TOPICS -> createTopicsRequest - ) + removeAllAcls() + val resources = Set[ResourceType](Topic) - val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( - ApiKeys.CREATE_TOPICS -> clusterCreateAcl, - ) + sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = false) - for ((key, request) <- requestKeyToRequest) { + clusterCreateAcl.get(topicResource).foreach { acls => + val describeAcls = topicDescribeAcl(topicResource) + val isAuthorized = describeAcls == acls + addAndVerifyAcls(describeAcls, topicResource) + sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = isAuthorized) removeAllAcls() - val resources = Set[ResourceType](Topic) - - sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false) - - val resourceToAcls = requestKeysToAcls(key) - resourceToAcls.get(topicResource).foreach { acls => - val describeAcls = topicDescribeAcl(topicResource) - val isAuthorized = describeAcls == acls - addAndVerifyAcls(describeAcls, topicResource) - sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = isAuthorized) - removeAllAcls() - } - - for ((resource, acls) <- resourceToAcls) - addAndVerifyAcls(acls, resource) - sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true) } + + for ((resource, acls) <- clusterCreateAcl) + addAndVerifyAcls(acls, resource) + sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = true) } @Test @@ -584,19 +574,34 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testCreatePermissionNeededForWritingToNonExistentTopic() { - val newTopic = "newTopic" + def testCreatePermissionOnTopicToWriteToNonExistentTopic() { + testCreatePermissionNeededToWriteToNonExistentTopic("newTopic", + Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write), new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), + Topic) + } + + @Test + def testCreatePermissionOnClusterToWriteToNonExistentTopic() { + testCreatePermissionNeededToWriteToNonExistentTopic("newTopic", + Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), + Cluster) + } + + private def testCreatePermissionNeededToWriteToNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) { val topicPartition = new TopicPartition(newTopic, 0) - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic)) + val newTopicResource = new Resource(Topic, newTopic) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource) try { sendRecords(numRecords, topicPartition) Assert.fail("should have thrown exception") } catch { - case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } - // using cluster-level create Acl in this test, leaving topic-level create Acl in the matching `read` test - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource + addAndVerifyAcls(acls, resource) + sendRecords(numRecords, topicPartition) } @@ -834,27 +839,39 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testCreatePermissionNeededToReadFromNonExistentTopic() { - val newTopic = "newTopic" + def testCreatePermissionOnTopicToReadFromNonExistentTopic() { + testCreatePermissionNeededToReadFromNonExistentTopic("newTopic", + Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), + Topic) + } + + @Test + def testCreatePermissionOnClusterToReadFromNonExistentTopic() { + testCreatePermissionNeededToReadFromNonExistentTopic("newTopic", + Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), + Cluster) + } + + private def testCreatePermissionNeededToReadFromNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) { val topicPartition = new TopicPartition(newTopic, 0) val newTopicResource = new Resource(Topic, newTopic) addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource) addAndVerifyAcls(groupReadAcl(groupResource), groupResource) try { this.consumers.head.assign(List(topicPartition).asJava) - consumeRecords(this.consumers.head) - Assert.fail("should have thrown exception") + this.consumers.head.poll(Duration.ofMillis(50L)); + Assert.fail("should have thrown Authorization Exception") } catch { case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } - // using topic-level create Acl in this test, leaving cluster-level create Acl in the matching `write` test - addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write), new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), - newTopicResource) + val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource + addAndVerifyAcls(acls, resource) - sendRecords(numRecords, topicPartition) - consumeRecords(this.consumers.head, topic = newTopic, part = 0) + // need to check twice as a single poll may not cause topic creation + this.consumers.head.poll(Duration.ofMillis(50L)); + this.consumers.head.poll(Duration.ofMillis(50L)); } @Test(expected = classOf[AuthorizationException]) From afd55d77a2fd0f7e9ecbc01b4d145fa704aafc91 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 5 Jun 2018 10:21:56 +0100 Subject: [PATCH 06/17] avoiding double poll in test --- .../integration/kafka/api/AuthorizerIntegrationTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index de4aa0c7c598c..a90e391d6e598 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -857,8 +857,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val newTopicResource = new Resource(Topic, newTopic) addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource) addAndVerifyAcls(groupReadAcl(groupResource), groupResource) + this.consumers.head.assign(List(topicPartition).asJava) try { - this.consumers.head.assign(List(topicPartition).asJava) this.consumers.head.poll(Duration.ofMillis(50L)); Assert.fail("should have thrown Authorization Exception") } catch { @@ -869,9 +869,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource addAndVerifyAcls(acls, resource) - // need to check twice as a single poll may not cause topic creation - this.consumers.head.poll(Duration.ofMillis(50L)); - this.consumers.head.poll(Duration.ofMillis(50L)); + // need to use a larger timeout in this subsequent poll else it may not cause topic auto-creation + // this can be verified by commenting the above addAndVerifyAcls line and expecting this test to fail + this.consumers.head.poll(Duration.ofMillis(300L)); } @Test(expected = classOf[AuthorizationException]) From c3bf9fe7841649541b621e327a24147192382f4e Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 5 Jun 2018 11:50:36 +0100 Subject: [PATCH 07/17] syntax simplification following review --- core/src/main/scala/kafka/admin/AclCommand.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 40ac082e84337..4409a187ae233 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -157,8 +157,8 @@ object AclCommand extends Logging { val transactionalIdAcls = getAcl(opts, Set(Write, Describe)) //Write, Describe, Create permission on topics, Write, Describe on transactionalIds - topics.map(_ -> topicAcls).toMap[Resource, Set[Acl]] ++ - transactionalIds.map(_ -> transactionalIdAcls).toMap[Resource, Set[Acl]] ++ + topics.map(_ -> topicAcls).toMap ++ + transactionalIds.map(_ -> transactionalIdAcls).toMap ++ (if (enableIdempotence) Map(Resource.ClusterResource -> getAcl(opts, Set(IdempotentWrite))) else @@ -175,8 +175,8 @@ object AclCommand extends Logging { val acls = getAcl(opts, Set(Read, Describe)) - topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++ - groups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]] + topics.map(_ -> acls).toMap ++ + groups.map(_ -> getAcl(opts, Set(Read))).toMap } private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { From b86c3c2fb057eea332be167a31d4019908502bed Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 5 Jun 2018 12:09:52 +0100 Subject: [PATCH 08/17] addressed review comment --- .../integration/kafka/api/AuthorizerIntegrationTest.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index a90e391d6e598..b78cfd3705782 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -502,14 +502,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = false) - clusterCreateAcl.get(topicResource).foreach { acls => - val describeAcls = topicDescribeAcl(topicResource) - val isAuthorized = describeAcls == acls - addAndVerifyAcls(describeAcls, topicResource) - sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = isAuthorized) - removeAllAcls() - } - for ((resource, acls) <- clusterCreateAcl) addAndVerifyAcls(acls, resource) sendRequestAndVerifyResponseError(ApiKeys.CREATE_TOPICS, createTopicsRequest, resources, isAuthorized = true) From 59d1822dad217cecf5d3d68cc184484cb6ad6f4a Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 5 Jun 2018 14:04:31 +0100 Subject: [PATCH 09/17] using TestUtil loop to check for topic existence --- .../integration/kafka/api/AuthorizerIntegrationTest.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index b78cfd3705782..8dfb6aab4a1fe 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -861,9 +861,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource addAndVerifyAcls(acls, resource) - // need to use a larger timeout in this subsequent poll else it may not cause topic auto-creation - // this can be verified by commenting the above addAndVerifyAcls line and expecting this test to fail - this.consumers.head.poll(Duration.ofMillis(300L)); + // need to retry to avoid using a long timeout + TestUtils.waitUntilTrue(() => { + this.consumers.head.poll(Duration.ofMillis(50L)) + this.zkClient.topicExists(newTopic) + }, "Expected topic was not created") } @Test(expected = classOf[AuthorizationException]) From 8226bfca85bb6f58e9320a5d70329f09a5b17d94 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 5 Jun 2018 14:07:44 +0100 Subject: [PATCH 10/17] fixed extra space --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4fd0c2b4d62c2..06b994be9f3f0 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1453,7 +1453,7 @@ class KafkaApis(val requestChannel: RequestChannel, duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap } else Map.empty - val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) + val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults sendResponseCallback(completeResults) } From d4e5fc4f8137b9ffe9818ba918454fe522190472 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 5 Jun 2018 14:22:08 +0100 Subject: [PATCH 11/17] addressed review comment --- .../kafka/api/AuthorizerIntegrationTest.scala | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 8dfb6aab4a1fe..6a77c5c32ac8c 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -567,32 +567,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testCreatePermissionOnTopicToWriteToNonExistentTopic() { - testCreatePermissionNeededToWriteToNonExistentTopic("newTopic", - Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write), new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), - Topic) + testCreatePermissionNeededToWriteToNonExistentTopic(Topic) } @Test def testCreatePermissionOnClusterToWriteToNonExistentTopic() { - testCreatePermissionNeededToWriteToNonExistentTopic("newTopic", - Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), - Cluster) + testCreatePermissionNeededToWriteToNonExistentTopic(Cluster) } - private def testCreatePermissionNeededToWriteToNonExistentTopic(newTopic: String, acls: Set[Acl], resType: ResourceType) { - val topicPartition = new TopicPartition(newTopic, 0) - val newTopicResource = new Resource(Topic, newTopic) + private def testCreatePermissionNeededToWriteToNonExistentTopic(resType: ResourceType) { + val topicPartition = new TopicPartition(createTopic, 0) + val newTopicResource = new Resource(Topic, createTopic) addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Write)), newTopicResource) try { sendRecords(numRecords, topicPartition) Assert.fail("should have thrown exception") } catch { case e: TopicAuthorizationException => - assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) + assertEquals(Collections.singleton(createTopic), e.unauthorizedTopics()) } val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource - addAndVerifyAcls(acls, resource) + addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)), resource) sendRecords(numRecords, topicPartition) } From d82f725e63b60dbed87a7e7fff371f8982a7ad99 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 5 Jun 2018 14:22:23 +0100 Subject: [PATCH 12/17] missing @Test annotation --- .../scala/integration/kafka/api/EndToEndAuthorizationTest.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index ad5978ba90131..05d5b6fa8f9d1 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -240,6 +240,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } } + @Test def testProduceConsumeTopicAutoCreateTopicCreateAcl(): Unit = { // topic2 is not created on setup() val tp2 = new TopicPartition("topic2", 0) From ce3db5a60f4adf3624d1e016b9f4c924869a60e0 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 5 Jun 2018 14:29:06 +0100 Subject: [PATCH 13/17] split long line --- core/src/main/scala/kafka/server/KafkaApis.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 06b994be9f3f0..08be97f0168b4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1041,7 +1041,8 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { if (!authorize(request.session, Create, Resource.ClusterResource)) { - val unauthorizedForCreateTopics = authorizedTopics.filter(topic => !authorize(request.session, Create, new Resource(Topic, topic))) + val unauthorizedForCreateTopics = authorizedTopics.filter( + topic => !authorize(request.session, Create, new Resource(Topic, topic))) authorizedTopics --= unauthorizedForCreateTopics authorizedForDescribeNotCreateTopics ++= unauthorizedForCreateTopics } From b89ff7a31413d63123d1eedb7c89eee2d9674bc7 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 5 Jun 2018 18:02:33 +0100 Subject: [PATCH 14/17] removed overkill testProduceConsumeTopicAutoCreateClusterCreateAcl --- .../kafka/api/EndToEndAuthorizationTest.scala | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 05d5b6fa8f9d1..214af028db02b 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -249,27 +249,6 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas consumeRecords(this.consumers.head, numRecords, topic = tp2.topic) } - @Test - def testProduceConsumeTopicAutoCreateClusterCreateAcl(): Unit = { - // topic2 is not created on setup() - val tp2 = new TopicPartition("topic2", 0) - setClusterCreateAclsAndProduce(tp2) - consumers.head.assign(List(tp2).asJava) - consumeRecords(this.consumers.head, numRecords, topic = tp2.topic) - } - - protected def setClusterCreateAclsAndProduce(tp: TopicPartition) { - AclCommand.main(clusterCreateAclArgs) - AclCommand.main(topicWriteAclArgs(tp.topic)) - AclCommand.main(consumeAclArgs(tp.topic)) - servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, new Resource(Topic, tp.topic)) - TestUtils.waitAndVerifyAcls(ClusterBrokerActionAcl ++ ClusterCreateAcl, s.apis.authorizer.get, clusterResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) - } - sendRecords(numRecords, tp) - } - protected def setAclsAndProduce(tp: TopicPartition) { AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(consumeAclArgs(tp.topic)) From dc2727e4b07ccebbf3de735e343c57dd0a52c417 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 5 Jun 2018 23:43:28 -0700 Subject: [PATCH 15/17] Fix bug in KafkaApis --- core/src/main/scala/kafka/server/KafkaApis.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 08be97f0168b4..98672c8287a27 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1035,21 +1035,21 @@ class KafkaApis(val requestChannel: RequestChannel, var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic))) - var authorizedForDescribeNotCreateTopics = Set[String]() + var unauthorizedForCreateTopics = Set[String]() if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { if (!authorize(request.session, Create, Resource.ClusterResource)) { - val unauthorizedForCreateTopics = authorizedTopics.filter( - topic => !authorize(request.session, Create, new Resource(Topic, topic))) + unauthorizedForCreateTopics = nonExistingTopics.filter { topic => + !authorize(request.session, Create, new Resource(Topic, topic)) + } authorizedTopics --= unauthorizedForCreateTopics - authorizedForDescribeNotCreateTopics ++= unauthorizedForCreateTopics } } } - val unauthorizedForCreateTopicMetadata = authorizedForDescribeNotCreateTopics.map(topic => + val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic => new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, isInternal(topic), java.util.Collections.emptyList())) From c37272faf3e0ba28e0a931dba01a10cdf38bfd86 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 5 Jun 2018 23:43:43 -0700 Subject: [PATCH 16/17] Minor tweaks in AuthorizerIntegrationTest --- .../kafka/api/AuthorizerIntegrationTest.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 6a77c5c32ac8c..ea5a155b5dcca 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -846,18 +846,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)), newTopicResource) addAndVerifyAcls(groupReadAcl(groupResource), groupResource) this.consumers.head.assign(List(topicPartition).asJava) - try { - this.consumers.head.poll(Duration.ofMillis(50L)); - Assert.fail("should have thrown Authorization Exception") - } catch { - case e: TopicAuthorizationException => - assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) - } + val unauthorizedTopics = intercept[TopicAuthorizationException] { + (0 until 10).foreach(_ => consumers.head.poll(Duration.ofMillis(50L))) + }.unauthorizedTopics + assertEquals(Collections.singleton(newTopic), unauthorizedTopics) val resource = if (resType == Topic) newTopicResource else Resource.ClusterResource addAndVerifyAcls(acls, resource) - // need to retry to avoid using a long timeout TestUtils.waitUntilTrue(() => { this.consumers.head.poll(Duration.ofMillis(50L)) this.zkClient.topicExists(newTopic) From 0276cf0ee03a9c5ba21cf2f164ebd51efde4b57c Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 5 Jun 2018 23:53:41 -0700 Subject: [PATCH 17/17] Remove dead code and some renames in EndToEndAuthorizationTest --- .../kafka/api/EndToEndAuthorizationTest.scala | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 214af028db02b..3808de6ec696a 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -60,7 +60,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas override val serverCount = 3 override def configureSecurityBeforeServersStart() { - AclCommand.main(clusterBrokerAclArgs) + AclCommand.main(clusterActionArgs) AclCommand.main(topicBrokerReadAclArgs) } @@ -83,7 +83,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas val wildcardGroupResource = new Resource(Group, wildcard) // Arguments to AclCommand to set ACLs. - def clusterBrokerAclArgs: Array[String] = Array("--authorizer-properties", + def clusterActionArgs: Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", s"--cluster", @@ -95,7 +95,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas s"--topic=$wildcard", s"--operation=Read", s"--allow-principal=$kafkaPrincipalType:$kafkaPrincipal") - def produceAclArgs(topic:String): Array[String] = Array("--authorizer-properties", + def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", s"--topic=$topic", @@ -142,27 +142,14 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas s"--consumer", s"--producer", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") - def clusterCreateAclArgs: Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--cluster", - s"--operation=Create", - s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") - def topicWriteAclArgs(topic:String): Array[String] = Array("--authorizer-properties", - s"zookeeper.connect=$zkConnect", - s"--add", - s"--topic=$topic", - s"--operation=Write", - s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") - def ClusterBrokerActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction)) + def ClusterActionAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, ClusterAction)) def TopicBrokerReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, kafkaPrincipal), Allow, Acl.WildCardHost, Read)) def GroupReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) def TopicReadAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Read)) def TopicWriteAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Write)) def TopicDescribeAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Describe)) def TopicCreateAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Create)) - def ClusterCreateAcl = Set(new Acl(new KafkaPrincipal(kafkaPrincipalType, clientPrincipal), Allow, Acl.WildCardHost, Create)) // The next two configuration parameters enable ZooKeeper secure ACLs // and sets the Kafka authorizer, both necessary to enable security. this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") @@ -181,7 +168,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas override def setUp() { super.setUp() servers.foreach { s => - TestUtils.waitAndVerifyAcls(ClusterBrokerActionAcl, s.apis.authorizer.get, Resource.ClusterResource) + TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource) TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*")) } // create the test topic with all the brokers as replicas