Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3833,6 +3833,7 @@ class KafkaApis(val requestChannel: RequestChannel,

def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = {
val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest]
val includeAuthorizedOperations = consumerGroupDescribeRequest.data.includeAuthorizedOperations

if (!isConsumerGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the default). If the
Expand Down Expand Up @@ -3861,6 +3862,17 @@ class KafkaApis(val requestChannel: RequestChannel,
if (exception != null) {
requestHelper.sendMaybeThrottle(request, consumerGroupDescribeRequest.getErrorResponse(exception))
} else {
if (includeAuthorizedOperations) {
results.forEach { groupResult =>
if (groupResult.errorCode == Errors.NONE.code) {
groupResult.setAuthorizedOperations(authHelper.authorizedOperations(
request,
new Resource(ResourceType.GROUP, groupResult.groupId)
))
}
}
}

if (response.groups.isEmpty) {
// If the response is empty, we can directly reuse the results.
response.setGroups(results)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,24 @@
*/
package kafka.server

import kafka.server.GroupCoordinatorBaseRequestTest
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type}
import kafka.test.annotation._
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.common.ConsumerGroupState
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, DescribedGroup, TopicPartitions}
import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{Tag, Timeout}

import java.lang.{Byte => JByte}
import scala.jdk.CollectionConverters._

@Timeout(120)
Expand Down Expand Up @@ -116,6 +119,9 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)

// Add first group with one member.
var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
Expand Down Expand Up @@ -162,6 +168,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId)
Expand All @@ -177,6 +184,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
.setGroupEpoch(grp2Member2Response.memberEpoch)
.setAssignmentEpoch(grp2Member2Response.memberEpoch)
.setAssignorName("range")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member2Response.memberId)
Expand Down Expand Up @@ -219,7 +227,8 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC

val actual = consumerGroupDescribe(
groupIds = List("grp-1", "grp-2"),
version = version.toShort
includeAuthorizedOperations = true,
version = version.toShort,
)

assertEquals(expected, actual)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,13 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {

protected def consumerGroupDescribe(
groupIds: List[String],
includeAuthorizedOperations: Boolean,
version: Short = ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
): List[ConsumerGroupDescribeResponseData.DescribedGroup] = {
val consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder(
new ConsumerGroupDescribeRequestData().setGroupIds(groupIds.asJava)
new ConsumerGroupDescribeRequestData()
.setGroupIds(groupIds.asJava)
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
).build(version)

val consumerGroupDescribeResponse = connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest)
Expand Down
28 changes: 22 additions & 6 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
Expand All @@ -92,6 +93,7 @@ import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}

import java.lang.{Byte => JByte}
import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.time.Duration
Expand Down Expand Up @@ -7113,8 +7115,9 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
}

@Test
def testConsumerGroupDescribe(): Unit = {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn {
new FinalizedFeatures(
Expand All @@ -7127,6 +7130,7 @@ class KafkaApisTest extends Logging {

val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData()
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build())

Expand All @@ -7141,15 +7145,27 @@ class KafkaApisTest extends Logging {
)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)

val describedGroups = List(
future.complete(List(
new DescribedGroup().setGroupId(groupIds.get(0)),
new DescribedGroup().setGroupId(groupIds.get(1)),
new DescribedGroup().setGroupId(groupIds.get(2))
).asJava
).asJava)

future.complete(describedGroups)
var authorizedOperationsInt = Int.MinValue;
if (includeAuthorizedOperations) {
authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
}

// Can't reuse the above list here because we would not test the implementation in KafkaApis then
val describedGroups = List(
new DescribedGroup().setGroupId(groupIds.get(0)),
new DescribedGroup().setGroupId(groupIds.get(1)),
new DescribedGroup().setGroupId(groupIds.get(2))
).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData()
.setGroups(describedGroups)
.setGroups(describedGroups.asJava)

val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)

Expand Down