From a11b977c8c0d024682b183d7f50f71b12343d49a Mon Sep 17 00:00:00 2001 From: Max Riedel Date: Fri, 31 May 2024 20:34:38 +0200 Subject: [PATCH 1/5] KAFKA-14509: Handle includeAuthorizedOperations --- core/src/main/scala/kafka/server/KafkaApis.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 70258e8dac522..1536997c2253a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3824,6 +3824,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest] + val includeAuthorizedOperations = consumerGroupDescribeRequest.data.includeAuthorizedOperations if (!config.isNewGroupCoordinatorEnabled) { // The API is not supported by the "old" group coordinator (the default). If the @@ -3852,6 +3853,17 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { requestHelper.sendMaybeThrottle(request, consumerGroupDescribeRequest.getErrorResponse(exception)) } else { + if (request.header.apiVersion >= 3 && includeAuthorizedOperations) { + results.forEach { groupResult => + if (groupResult.errorCode == Errors.NONE.code) { + groupResult.setAuthorizedOperations(authHelper.authorizedOperations( + request, + new Resource(ResourceType.GROUP, groupResult.groupId) + )) + } + } + } + if (response.groups.isEmpty) { // If the response is empty, we can directly reuse the results. response.setGroups(results) From 2228d2aa7592203968487aee28646abbb215347a Mon Sep 17 00:00:00 2001 From: Max Riedel Date: Sun, 2 Jun 2024 17:59:53 +0200 Subject: [PATCH 2/5] KAFKA-14509: Extend unit tests to cover authorizedOperations --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1536997c2253a..2a3021b5f3208 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -3853,7 +3853,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { requestHelper.sendMaybeThrottle(request, consumerGroupDescribeRequest.getErrorResponse(exception)) } else { - if (request.header.apiVersion >= 3 && includeAuthorizedOperations) { + if (includeAuthorizedOperations) { results.forEach { groupResult => if (groupResult.errorCode == Errors.NONE.code) { groupResult.setAuthorizedOperations(authHelper.authorizedOperations( diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 52b55ffc33fce..03f7e70d12ef5 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -7086,6 +7086,7 @@ class KafkaApisTest extends Logging { def testConsumerGroupDescribe(): Unit = { val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + .setIncludeAuthorizedOperations(true) consumerGroupDescribeRequestData.groupIds.addAll(groupIds) val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) @@ -7099,15 +7100,20 @@ class KafkaApisTest extends Logging { ) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) - val describedGroups = List( + future.complete(List( new DescribedGroup().setGroupId(groupIds.get(0)), new DescribedGroup().setGroupId(groupIds.get(1)), new DescribedGroup().setGroupId(groupIds.get(2)) - ).asJava + ).asJava) - future.complete(describedGroups) + // Can't reuse the above list here because we would not test the implementation in KafkaApis then + val describedGroups = List( + new DescribedGroup().setGroupId(groupIds.get(0)), + new DescribedGroup().setGroupId(groupIds.get(1)), + new DescribedGroup().setGroupId(groupIds.get(2)) + ).map(group => group.setAuthorizedOperations(328)) // Integer representation of authorized operations for this request val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() - .setGroups(describedGroups) + .setGroups(describedGroups.asJava) val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) From 26b134305d11726a3e4faf4ca122f307bb14c598 Mon Sep 17 00:00:00 2001 From: Max Riedel Date: Fri, 7 Jun 2024 12:33:24 +0200 Subject: [PATCH 3/5] KAFKA-14509: Parameterize Unit Test --- .../test/scala/unit/kafka/server/KafkaApisTest.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 03f7e70d12ef5..ba9f7a1dad650 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -7082,11 +7082,12 @@ class KafkaApisTest extends Logging { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } - @Test - def testConsumerGroupDescribe(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = { val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() - .setIncludeAuthorizedOperations(true) + .setIncludeAuthorizedOperations(includeAuthorizedOperations) consumerGroupDescribeRequestData.groupIds.addAll(groupIds) val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) @@ -7107,11 +7108,12 @@ class KafkaApisTest extends Logging { ).asJava) // Can't reuse the above list here because we would not test the implementation in KafkaApis then + val authorizedOperationsInt = if (includeAuthorizedOperations) 328 else Int.MinValue; val describedGroups = List( new DescribedGroup().setGroupId(groupIds.get(0)), new DescribedGroup().setGroupId(groupIds.get(1)), new DescribedGroup().setGroupId(groupIds.get(2)) - ).map(group => group.setAuthorizedOperations(328)) // Integer representation of authorized operations for this request + ).map(group => group.setAuthorizedOperations(authorizedOperationsInt)) // Integer representation of authorized operations for this request val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() .setGroups(describedGroups.asJava) From 1d9bf29f84599ad951f3898a46667caa12bf14a5 Mon Sep 17 00:00:00 2001 From: Max Riedel Date: Fri, 7 Jun 2024 16:52:03 +0200 Subject: [PATCH 4/5] KAFKA-14509: extend integration test with includeAuthorizedOperations flag --- .../kafka/server/ConsumerGroupDescribeRequestsTest.scala | 7 +++++-- .../kafka/server/GroupCoordinatorBaseRequestTest.scala | 5 ++++- core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 4 ++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala index b2e4e0f85f1d2..e4b53b5e8ce32 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala @@ -16,7 +16,6 @@ */ package kafka.server -import kafka.server.GroupCoordinatorBaseRequestTest import kafka.test.ClusterInstance import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} import kafka.test.junit.ClusterTestExtensions @@ -80,6 +79,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC val timeoutMs = 5 * 60 * 1000 val clientId = "client-id" val clientHost = "/127.0.0.1" + val authorizedOperationsInt = 328; // Integer representation of the authorized operations for this request // Add first group with one member. var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null @@ -126,6 +126,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC .setGroupEpoch(1) .setAssignmentEpoch(1) .setAssignorName("uniform") + .setAuthorizedOperations(authorizedOperationsInt) .setMembers(List( new ConsumerGroupDescribeResponseData.Member() .setMemberId(grp1Member1Response.memberId) @@ -141,6 +142,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC .setGroupEpoch(grp2Member2Response.memberEpoch) .setAssignmentEpoch(grp2Member2Response.memberEpoch) .setAssignorName("range") + .setAuthorizedOperations(authorizedOperationsInt) .setMembers(List( new ConsumerGroupDescribeResponseData.Member() .setMemberId(grp2Member2Response.memberId) @@ -183,7 +185,8 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC val actual = consumerGroupDescribe( groupIds = List("grp-1", "grp-2"), - version = version.toShort + includeAuthorizedOperations = true, + version = version.toShort, ) assertEquals(expected, actual) diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 847bdf3225f54..9fad21476e73c 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -421,10 +421,13 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { protected def consumerGroupDescribe( groupIds: List[String], + includeAuthorizedOperations: Boolean, version: Short = ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled) ): List[ConsumerGroupDescribeResponseData.DescribedGroup] = { val consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder( - new ConsumerGroupDescribeRequestData().setGroupIds(groupIds.asJava) + new ConsumerGroupDescribeRequestData() + .setGroupIds(groupIds.asJava) + .setIncludeAuthorizedOperations(includeAuthorizedOperations) ).build(version) val consumerGroupDescribeResponse = connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index ba9f7a1dad650..3666ed9732a9e 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -7108,12 +7108,12 @@ class KafkaApisTest extends Logging { ).asJava) // Can't reuse the above list here because we would not test the implementation in KafkaApis then - val authorizedOperationsInt = if (includeAuthorizedOperations) 328 else Int.MinValue; + val authorizedOperationsInt = if (includeAuthorizedOperations) 328 else Int.MinValue; // 328: Integer representation of authorized operations for this request val describedGroups = List( new DescribedGroup().setGroupId(groupIds.get(0)), new DescribedGroup().setGroupId(groupIds.get(1)), new DescribedGroup().setGroupId(groupIds.get(2)) - ).map(group => group.setAuthorizedOperations(authorizedOperationsInt)) // Integer representation of authorized operations for this request + ).map(group => group.setAuthorizedOperations(authorizedOperationsInt)) val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() .setGroups(describedGroups.asJava) From df98af100be3759624a769faea7b1e198ea5885d Mon Sep 17 00:00:00 2001 From: Max Riedel Date: Fri, 7 Jun 2024 18:15:41 +0200 Subject: [PATCH 5/5] KAFKA-14509: replace magic number --- .../server/ConsumerGroupDescribeRequestsTest.scala | 10 ++++++++-- .../test/scala/unit/kafka/server/KafkaApisTest.scala | 10 +++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala index 9b7c464debf7e..0e745f33d5bb3 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala @@ -17,7 +17,7 @@ package kafka.server import kafka.test.ClusterInstance -import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation._ import kafka.test.junit.ClusterTestExtensions import kafka.utils.TestUtils import org.apache.kafka.common.ConsumerGroupState @@ -25,11 +25,15 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assign import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse} +import org.apache.kafka.common.resource.ResourceType +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.common.Features import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{Tag, Timeout} +import java.lang.{Byte => JByte} import scala.jdk.CollectionConverters._ @Timeout(120) @@ -115,7 +119,9 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC val timeoutMs = 5 * 60 * 1000 val clientId = "client-id" val clientHost = "/127.0.0.1" - val authorizedOperationsInt = 328; // Integer representation of the authorized operations for this request + val authorizedOperationsInt = Utils.to32BitField( + AclEntry.supportedOperations(ResourceType.GROUP).asScala + .map(_.code.asInstanceOf[JByte]).asJava) // Add first group with one member. var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 7dd19161cded6..2113a46124c7d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -76,6 +76,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils} import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.raft.QuorumConfig +import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} @@ -92,6 +93,7 @@ import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} +import java.lang.{Byte => JByte} import java.net.InetAddress import java.nio.charset.StandardCharsets import java.time.Duration @@ -7151,8 +7153,14 @@ class KafkaApisTest extends Logging { new DescribedGroup().setGroupId(groupIds.get(2)) ).asJava) + var authorizedOperationsInt = Int.MinValue; + if (includeAuthorizedOperations) { + authorizedOperationsInt = Utils.to32BitField( + AclEntry.supportedOperations(ResourceType.GROUP).asScala + .map(_.code.asInstanceOf[JByte]).asJava) + } + // Can't reuse the above list here because we would not test the implementation in KafkaApis then - val authorizedOperationsInt = if (includeAuthorizedOperations) 328 else Int.MinValue; // 328: Integer representation of authorized operations for this request val describedGroups = List( new DescribedGroup().setGroupId(groupIds.get(0)), new DescribedGroup().setGroupId(groupIds.get(1)),