Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions clients/src/main/resources/common/message/MetadataRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"apiKey": 3,
"type": "request",
"name": "MetadataRequest",
"validVersions": "0-10",
"validVersions": "0-11",
"flexibleVersions": "9+",
"fields": [
// In version 0, an empty array indicates "request metadata for all topics." In version 1 and
Expand All @@ -31,7 +31,11 @@
// Starting in version 8, authorized operations can be requested for cluster and topic resource.
//
// Version 9 is the first flexible version.
// Version 10 add topicId
//
// Version 10 adds topicId.
//
// Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed
// by the DescribeCluster API (KIP-700).
{ "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
"about": "The topics to fetch metadata for.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
Expand All @@ -40,7 +44,7 @@
]},
{ "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", "default": "true", "ignorable": false,
"about": "If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so." },
{ "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8+",
{ "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8-10",
"about": "Whether to include cluster authorized operations." },
{ "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": "8+",
"about": "Whether to include topic authorized operations." }
Expand Down
10 changes: 7 additions & 3 deletions clients/src/main/resources/common/message/MetadataResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@
// Starting in version 8, brokers can send authorized operations for topic and cluster.
//
// Version 9 is the first flexible version.
// Version 10 add topicId
"validVersions": "0-10",
//
// Version 10 adds topicId.
//
// Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
// by the DescribeCluster API (KIP-700).
"validVersions": "0-11",
"flexibleVersions": "9+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
Expand Down Expand Up @@ -86,7 +90,7 @@
{ "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this topic." }
]},
{ "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+", "default": "-2147483648",
{ "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8-10", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this cluster." }
]
}
18 changes: 10 additions & 8 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1245,14 +1245,16 @@ class KafkaApis(val requestChannel: RequestChannel,
)
}

var clusterAuthorizedOperations = Int.MinValue
if (request.header.apiVersion >= 8) {
var clusterAuthorizedOperations = Int.MinValue // Default value in the schema
if (requestVersion >= 8) {
// get cluster authorized operations
if (metadataRequest.data.includeClusterAuthorizedOperations) {
if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
clusterAuthorizedOperations = authHelper.authorizedOperations(request, Resource.CLUSTER)
else
clusterAuthorizedOperations = 0
if (requestVersion <= 10) {
if (metadataRequest.data.includeClusterAuthorizedOperations) {
if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
clusterAuthorizedOperations = authHelper.authorizedOperations(request, Resource.CLUSTER)
else
clusterAuthorizedOperations = 0
}
}

// get topic authorized operations
Expand Down Expand Up @@ -3215,7 +3217,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDescribeCluster(request: RequestChannel.Request): Unit = {
val describeClusterRequest = request.body[DescribeClusterRequest]

var clusterAuthorizedOperations = Int.MinValue
var clusterAuthorizedOperations = Int.MinValue // Default value in the schema
// get cluster authorized operations
if (describeClusterRequest.data.includeClusterAuthorizedOperations) {
if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.MetadataRequestData
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection
Expand Down Expand Up @@ -1853,17 +1854,62 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
shouldIdempotentProducerFailInInitProducerId(true)
}

@Test
def testMetadataClusterAuthorizedOperationsWithoutDescribeCluster(): Unit = {
removeAllClientAcls()

// MetadataRequest versions older than 1 are not supported.
for (version <- 1 to ApiKeys.METADATA.latestVersion) {
testMetadataClusterClusterAuthorizedOperations(version.toShort, 0)
}
}

@Test
def testMetadataClusterAuthorizedOperationsWithDescribeAndAlterCluster(): Unit = {
removeAllClientAcls()

val clusterResource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val acls = Set(
new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW),
new AccessControlEntry(clientPrincipalString, WildcardHost, ALTER, ALLOW)
)
addAndVerifyAcls(acls, clusterResource)

val expectedClusterAuthorizedOperations = Utils.to32BitField(
acls.map(_.operation.code.asInstanceOf[JByte]).asJava)

// MetadataRequest versions older than 1 are not supported.
for (version <- 1 to ApiKeys.METADATA.latestVersion) {
testMetadataClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations)
}
}

private def testMetadataClusterClusterAuthorizedOperations(
version: Short,
expectedClusterAuthorizedOperations: Int
): Unit = {
val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(Collections.emptyList())
.setAllowAutoTopicCreation(true)
.setIncludeClusterAuthorizedOperations(true))
.build(version)

// The expected value is only verified if the request supports it.
if (version >= 8 && version <= 10) {
val metadataResponse = connectAndReceive[MetadataResponse](metadataRequest)
assertEquals(expectedClusterAuthorizedOperations, metadataResponse.data.clusterAuthorizedOperations)
} else {
assertThrows(classOf[UnsupportedVersionException],
() => connectAndReceive[MetadataResponse](metadataRequest))
}
}

@Test
def testDescribeClusterClusterAuthorizedOperationsWithoutDescribeCluster(): Unit = {
removeAllClientAcls()

for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) {
val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(true))
.build(version.toShort)
val describeClusterResponse = connectAndReceive[DescribeClusterResponse](describeClusterRequest)

assertEquals(0, describeClusterResponse.data.clusterAuthorizedOperations)
testDescribeClusterClusterAuthorizedOperations(version.toShort, 0)
}
}

Expand All @@ -1882,15 +1928,22 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
acls.map(_.operation.code.asInstanceOf[JByte]).asJava)

for (version <- ApiKeys.DESCRIBE_CLUSTER.oldestVersion to ApiKeys.DESCRIBE_CLUSTER.latestVersion) {
val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(true))
.build(version.toShort)
val describeClusterResponse = connectAndReceive[DescribeClusterResponse](describeClusterRequest)

assertEquals(expectedClusterAuthorizedOperations, describeClusterResponse.data.clusterAuthorizedOperations)
testDescribeClusterClusterAuthorizedOperations(version.toShort, expectedClusterAuthorizedOperations)
}
}

private def testDescribeClusterClusterAuthorizedOperations(
version: Short,
expectedClusterAuthorizedOperations: Int
): Unit = {
val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(true))
.build(version)

val describeClusterResponse = connectAndReceive[DescribeClusterResponse](describeClusterRequest)
assertEquals(expectedClusterAuthorizedOperations, describeClusterResponse.data.clusterAuthorizedOperations)
}

def removeAllClientAcls(): Unit = {
val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
val aclEntryFilter = new AccessControlEntryFilter(clientPrincipalString, null, AclOperation.ANY, AclPermissionType.ANY)
Expand Down