From 259fe86df4209833864d64c49205f047ed44635b Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 21 Jan 2021 09:14:17 +0100 Subject: [PATCH 1/2] KAFKA-12212; Bump Metadata API version to remove `ClusterAuthorizedOperations` fields --- .../common/message/MetadataRequest.json | 9 ++- .../common/message/MetadataResponse.json | 9 ++- .../main/scala/kafka/server/KafkaApis.scala | 14 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 77 ++++++++++++++++--- 4 files changed, 85 insertions(+), 24 deletions(-) diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json index 30316e669f247..e0f1f5e4df5d5 100644 --- a/clients/src/main/resources/common/message/MetadataRequest.json +++ b/clients/src/main/resources/common/message/MetadataRequest.json @@ -17,7 +17,7 @@ "apiKey": 3, "type": "request", "name": "MetadataRequest", - "validVersions": "0-10", + "validVersions": "0-11", "flexibleVersions": "9+", "fields": [ // In version 0, an empty array indicates "request metadata for all topics." In version 1 and @@ -31,7 +31,10 @@ // Starting in version 8, authorized operations can be requested for cluster and topic resource. // // Version 9 is the first flexible version. - // Version 10 add topicId + // + // Version 10 adds topicId. + // + // Version 11 deprecates IncludeClusterAuthorizedOperations field (KIP-700). { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+", "about": "The topics to fetch metadata for.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." }, @@ -40,7 +43,7 @@ ]}, { "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", "default": "true", "ignorable": false, "about": "If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so." }, - { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8+", + { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8-10", "about": "Whether to include cluster authorized operations." }, { "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": "8+", "about": "Whether to include topic authorized operations." } diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json index b54b830d458f7..12aa34d22a668 100644 --- a/clients/src/main/resources/common/message/MetadataResponse.json +++ b/clients/src/main/resources/common/message/MetadataResponse.json @@ -36,8 +36,11 @@ // Starting in version 8, brokers can send authorized operations for topic and cluster. // // Version 9 is the first flexible version. - // Version 10 add topicId - "validVersions": "0-10", + // + // Version 10 adds topicId. + // + // Version 11 deprecates ClusterAuthorizedOperations (KIP-700). + "validVersions": "0-11", "flexibleVersions": "9+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, @@ -86,7 +89,7 @@ { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this topic." } ]}, - { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648", + { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8-10", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this cluster." } ] } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 09c1ba49aca99..eae6bbeb14928 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1246,13 +1246,15 @@ class KafkaApis(val requestChannel: RequestChannel, } var clusterAuthorizedOperations = Int.MinValue - if (request.header.apiVersion >= 8) { + if (requestVersion >= 8) { // get cluster authorized operations - if (metadataRequest.data.includeClusterAuthorizedOperations) { - if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) - clusterAuthorizedOperations = authHelper.authorizedOperations(request, Resource.CLUSTER) - else - clusterAuthorizedOperations = 0 + if (requestVersion <= 10) { + if (metadataRequest.data.includeClusterAuthorizedOperations) { + if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) + clusterAuthorizedOperations = authHelper.authorizedOperations(request, Resource.CLUSTER) + else + clusterAuthorizedOperations = 0 + } } // get topic authorized operations diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index edda7a9838628..a2cc596c1fcc0 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -44,6 +44,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.MetadataRequestData import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection @@ -1853,17 +1854,62 @@ class AuthorizerIntegrationTest extends BaseRequestTest { shouldIdempotentProducerFailInInitProducerId(true) } + @Test + def testMetadataClusterAuthorizedOperationsWithoutDescribeCluster(): Unit = { + removeAllClientAcls() + + // MetadataRequest versions older than 1 are not supported. + for (version <- 1 to ApiKeys.METADATA.latestVersion) { + testMetadataClusterClusterAuthorizedOperations(version.toShort, 0) + } + } + + @Test + def testMetadataClusterAuthorizedOperationsWithDescribeAndAlterCluster(): Unit = { + removeAllClientAcls() + + val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) + val acls = Set( + new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW), + new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW) + ) + addAndVerifyAcls(acls, clusterResource) + + val expectedClusterAuthorizedOperations = Utils.to32BitField( + acls.map(_.operation.code.asInstanceOf[JByte]).asJava) + + // MetadataRequest versions older than 1 are not supported. + for (version <- 1 to ApiKeys.METADATA.latestVersion) { + testMetadataClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations) + } + } + + private def testMetadataClusterClusterAuthorizedOperations( + version: Short, + expectedClusterAuthorizedOperations: Int + ): Unit = { + val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(Collections.emptyList()) + .setAllowAutoTopicCreation(true) + .setIncludeClusterAuthorizedOperations(true)) + .build(version) + + // The expected value is only verified if the request supports it. + if (version >= 8 && version <= 10) { + val metadataResponse = connectAndReceive[MetadataResponse](metadataRequest) + assertEquals(expectedClusterAuthorizedOperations, metadataResponse.data.clusterAuthorizedOperations) + } else { + assertThrows(classOf[UnsupportedVersionException], + () => connectAndReceive[MetadataResponse](metadataRequest)) + } + } + @Test def testDescribeClusterClusterAuthorizedOperationsWithoutDescribeCluster(): Unit = { removeAllClientAcls() for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) { - val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData() - .setIncludeClusterAuthorizedOperations(true)) - .build(version.toShort) - val describeClusterResponse = connectAndReceive[DescribeClusterResponse](describeClusterRequest) - - assertEquals(0, describeClusterResponse.data.clusterAuthorizedOperations) + testDescribeClusterClusterAuthorizedOperations(version.toShort, 0) } } @@ -1882,15 +1928,22 @@ class AuthorizerIntegrationTest extends BaseRequestTest { acls.map(_.operation.code.asInstanceOf[JByte]).asJava) for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) { - val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData() - .setIncludeClusterAuthorizedOperations(true)) - .build(version.toShort) - val describeClusterResponse = connectAndReceive[DescribeClusterResponse](describeClusterRequest) - - assertEquals(expectedClusterAuthorizedOperations, describeClusterResponse.data.clusterAuthorizedOperations) + testDescribeClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations) } } + private def testDescribeClusterClusterAuthorizedOperations( + version: Short, + expectedClusterAuthorizedOperations: Int + ): Unit = { + val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData() + .setIncludeClusterAuthorizedOperations(true)) + .build(version) + + val describeClusterResponse = connectAndReceive[DescribeClusterResponse](describeClusterRequest) + assertEquals(expectedClusterAuthorizedOperations, describeClusterResponse.data.clusterAuthorizedOperations) + } + def removeAllClientAcls(): Unit = { val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, null, AclOperation.ANY, AclPermissionType.ANY) From 0373353fbcc31d6ed0f102abd79050958789a26f Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 21 Jan 2021 16:33:18 +0100 Subject: [PATCH 2/2] Address comments --- .../src/main/resources/common/message/MetadataRequest.json | 3 ++- .../src/main/resources/common/message/MetadataResponse.json | 3 ++- core/src/main/scala/kafka/server/KafkaApis.scala | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json index e0f1f5e4df5d5..02af116a1c210 100644 --- a/clients/src/main/resources/common/message/MetadataRequest.json +++ b/clients/src/main/resources/common/message/MetadataRequest.json @@ -34,7 +34,8 @@ // // Version 10 adds topicId. // - // Version 11 deprecates IncludeClusterAuthorizedOperations field (KIP-700). + // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed + // by the DescribeCluster API (KIP-700). { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+", "about": "The topics to fetch metadata for.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." }, diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json index 12aa34d22a668..70638d2f4fa21 100644 --- a/clients/src/main/resources/common/message/MetadataResponse.json +++ b/clients/src/main/resources/common/message/MetadataResponse.json @@ -39,7 +39,8 @@ // // Version 10 adds topicId. // - // Version 11 deprecates ClusterAuthorizedOperations (KIP-700). + // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed + // by the DescribeCluster API (KIP-700). "validVersions": "0-11", "flexibleVersions": "9+", "fields": [ diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index eae6bbeb14928..17c25860a9003 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1245,7 +1245,7 @@ class KafkaApis(val requestChannel: RequestChannel, ) } - var clusterAuthorizedOperations = Int.MinValue + var clusterAuthorizedOperations = Int.MinValue // Default value in the schema if (requestVersion >= 8) { // get cluster authorized operations if (requestVersion <= 10) { @@ -3217,7 +3217,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDescribeCluster(request: RequestChannel.Request): Unit = { val describeClusterRequest = request.body[DescribeClusterRequest] - var clusterAuthorizedOperations = Int.MinValue + var clusterAuthorizedOperations = Int.MinValue // Default value in the schema // get cluster authorized operations if (describeClusterRequest.data.includeClusterAuthorizedOperations) { if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))