diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json index 30316e669f247..02af116a1c210 100644 --- a/clients/src/main/resources/common/message/MetadataRequest.json +++ b/clients/src/main/resources/common/message/MetadataRequest.json @@ -17,7 +17,7 @@ "apiKey": 3, "type": "request", "name": "MetadataRequest", - "validVersions": "0-10", + "validVersions": "0-11", "flexibleVersions": "9+", "fields": [ // In version 0, an empty array indicates "request metadata for all topics." In version 1 and @@ -31,7 +31,11 @@ // Starting in version 8, authorized operations can be requested for cluster and topic resource. // // Version 9 is the first flexible version. - // Version 10 add topicId + // + // Version 10 adds topicId. + // + // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed + // by the DescribeCluster API (KIP-700). { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+", "about": "The topics to fetch metadata for.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." }, @@ -40,7 +44,7 @@ ]}, { "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", "default": "true", "ignorable": false, "about": "If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so." }, - { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8+", + { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8-10", "about": "Whether to include cluster authorized operations." }, { "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": "8+", "about": "Whether to include topic authorized operations." } diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json index b54b830d458f7..70638d2f4fa21 100644 --- a/clients/src/main/resources/common/message/MetadataResponse.json +++ b/clients/src/main/resources/common/message/MetadataResponse.json @@ -36,8 +36,12 @@ // Starting in version 8, brokers can send authorized operations for topic and cluster. // // Version 9 is the first flexible version. - // Version 10 add topicId - "validVersions": "0-10", + // + // Version 10 adds topicId. + // + // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed + // by the DescribeCluster API (KIP-700). + "validVersions": "0-11", "flexibleVersions": "9+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, @@ -86,7 +90,7 @@ { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this topic." } ]}, - { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648", + { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8-10", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this cluster." } ] } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 09c1ba49aca99..17c25860a9003 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1245,14 +1245,16 @@ class KafkaApis(val requestChannel: RequestChannel, ) } - var clusterAuthorizedOperations = Int.MinValue - if (request.header.apiVersion >= 8) { + var clusterAuthorizedOperations = Int.MinValue // Default value in the schema + if (requestVersion >= 8) { // get cluster authorized operations - if (metadataRequest.data.includeClusterAuthorizedOperations) { - if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) - clusterAuthorizedOperations = authHelper.authorizedOperations(request, Resource.CLUSTER) - else - clusterAuthorizedOperations = 0 + if (requestVersion <= 10) { + if (metadataRequest.data.includeClusterAuthorizedOperations) { + if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) + clusterAuthorizedOperations = authHelper.authorizedOperations(request, Resource.CLUSTER) + else + clusterAuthorizedOperations = 0 + } } // get topic authorized operations @@ -3215,7 +3217,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDescribeCluster(request: RequestChannel.Request): Unit = { val describeClusterRequest = request.body[DescribeClusterRequest] - var clusterAuthorizedOperations = Int.MinValue + var clusterAuthorizedOperations = Int.MinValue // Default value in the schema // get cluster authorized operations if (describeClusterRequest.data.includeClusterAuthorizedOperations) { if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index edda7a9838628..a2cc596c1fcc0 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -44,6 +44,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.MetadataRequestData import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection @@ -1853,17 +1854,62 @@ class AuthorizerIntegrationTest extends BaseRequestTest { shouldIdempotentProducerFailInInitProducerId(true) } + @Test + def testMetadataClusterAuthorizedOperationsWithoutDescribeCluster(): Unit = { + removeAllClientAcls() + + // MetadataRequest versions older than 1 are not supported. + for (version <- 1 to ApiKeys.METADATA.latestVersion) { + testMetadataClusterClusterAuthorizedOperations(version.toShort, 0) + } + } + + @Test + def testMetadataClusterAuthorizedOperationsWithDescribeAndAlterCluster(): Unit = { + removeAllClientAcls() + + val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL) + val acls = Set( + new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW), + new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW) + ) + addAndVerifyAcls(acls, clusterResource) + + val expectedClusterAuthorizedOperations = Utils.to32BitField( + acls.map(_.operation.code.asInstanceOf[JByte]).asJava) + + // MetadataRequest versions older than 1 are not supported. + for (version <- 1 to ApiKeys.METADATA.latestVersion) { + testMetadataClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations) + } + } + + private def testMetadataClusterClusterAuthorizedOperations( + version: Short, + expectedClusterAuthorizedOperations: Int + ): Unit = { + val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(Collections.emptyList()) + .setAllowAutoTopicCreation(true) + .setIncludeClusterAuthorizedOperations(true)) + .build(version) + + // The expected value is only verified if the request supports it. + if (version >= 8 && version <= 10) { + val metadataResponse = connectAndReceive[MetadataResponse](metadataRequest) + assertEquals(expectedClusterAuthorizedOperations, metadataResponse.data.clusterAuthorizedOperations) + } else { + assertThrows(classOf[UnsupportedVersionException], + () => connectAndReceive[MetadataResponse](metadataRequest)) + } + } + @Test def testDescribeClusterClusterAuthorizedOperationsWithoutDescribeCluster(): Unit = { removeAllClientAcls() for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) { - val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData() - .setIncludeClusterAuthorizedOperations(true)) - .build(version.toShort) - val describeClusterResponse = connectAndReceive[DescribeClusterResponse](describeClusterRequest) - - assertEquals(0, describeClusterResponse.data.clusterAuthorizedOperations) + testDescribeClusterClusterAuthorizedOperations(version.toShort, 0) } } @@ -1882,15 +1928,22 @@ class AuthorizerIntegrationTest extends BaseRequestTest { acls.map(_.operation.code.asInstanceOf[JByte]).asJava) for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) { - val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData() - .setIncludeClusterAuthorizedOperations(true)) - .build(version.toShort) - val describeClusterResponse = connectAndReceive[DescribeClusterResponse](describeClusterRequest) - - assertEquals(expectedClusterAuthorizedOperations, describeClusterResponse.data.clusterAuthorizedOperations) + testDescribeClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations) } } + private def testDescribeClusterClusterAuthorizedOperations( + version: Short, + expectedClusterAuthorizedOperations: Int + ): Unit = { + val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData() + .setIncludeClusterAuthorizedOperations(true)) + .build(version) + + val describeClusterResponse = connectAndReceive[DescribeClusterResponse](describeClusterRequest) + assertEquals(expectedClusterAuthorizedOperations, describeClusterResponse.data.clusterAuthorizedOperations) + } + def removeAllClientAcls(): Unit = { val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, null, AclOperation.ANY, AclPermissionType.ANY)