diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4a08cc67b8ac7..3337d67ec7ac3 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -68,6 +68,8 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import java.util.Collections.singletonList +import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic + import scala.annotation.nowarn import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -2353,6 +2355,51 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeTopicAclWithOperationAll(quorum: String): Unit = { + createTopicWithBrokerPrincipal(topic) + removeAllClientAcls() + + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WildcardHost, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val metadataRequestTopic = new MetadataRequestTopic() + .setName(topic) + + val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(Collections.singletonList(metadataRequestTopic)) + .setAllowAutoTopicCreation(false) + ).build() + + val metadataResponse = connectAndReceive[MetadataResponse](metadataRequest) + val topicResponseOpt = metadataResponse.topicMetadata().asScala.find(_.topic == topic) + assertTrue(topicResponseOpt.isDefined) + + val topicResponse = topicResponseOpt.get + assertEquals(Errors.NONE, topicResponse.error) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeTopicConfigsAclWithOperationAll(quorum: String): Unit = { + createTopicWithBrokerPrincipal(topic) + removeAllClientAcls() + + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WildcardHost, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() + .setResources(Collections.singletonList(new DescribeConfigsRequestData.DescribeConfigsResource() + .setResourceType(ConfigResource.Type.TOPIC.id) + .setResourceName(tp.topic))) + ).build() + + val describeConfigsResponse = connectAndReceive[DescribeConfigsResponse](describeConfigsRequest) + val topicConfigResponse = describeConfigsResponse.data.results.get(0) + assertEquals(Errors.NONE, Errors.forCode(topicConfigResponse.errorCode)) + } + private def testMetadataClusterClusterAuthorizedOperations( version: Short, expectedClusterAuthorizedOperations: Int @@ -2528,6 +2575,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { topic: String, numPartitions: Int = 1 ): Unit = { + // Note the principal builder implementation maps all connections on the + // inter-broker listener to the broker principal. createTopic( topic, numPartitions = numPartitions, diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java index 697e84239de69..8fee9f5e4066a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; +import static org.apache.kafka.common.acl.AclOperation.ALL; import static org.apache.kafka.common.acl.AclOperation.ALTER; import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS; import static org.apache.kafka.common.acl.AclOperation.DELETE; @@ -382,21 +383,26 @@ static AuthorizationResult findResult(Action action, // // But this rule only applies to ALLOW ACLs. So for example, a DENY ACL for READ // on a resource does not DENY describe for that resource. - if (acl.permissionType().equals(ALLOW)) { - switch (action.operation()) { - case DESCRIBE: - if (!IMPLIES_DESCRIBE.contains(acl.operation())) return null; - break; - case DESCRIBE_CONFIGS: - if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) return null; - break; - default: - if (!action.operation().equals(acl.operation())) return null; - break; + if (acl.operation() != ALL) { + if (acl.permissionType().equals(ALLOW)) { + switch (action.operation()) { + case DESCRIBE: + if (!IMPLIES_DESCRIBE.contains(acl.operation())) return null; + break; + case DESCRIBE_CONFIGS: + if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) return null; + break; + default: + if (action.operation() != acl.operation()) { + return null; + } + break; + } + } else if (action.operation() != acl.operation()) { + return null; } - } else { - if (!action.operation().equals(acl.operation())) return null; } + return acl.permissionType().equals(ALLOW) ? ALLOWED : DENIED; } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java index c56ad45d77fb6..734a96989c7be 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.server.authorizer.Action; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -41,14 +42,16 @@ import java.util.concurrent.atomic.AtomicLong; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.acl.AclOperation.ALL; +import static org.apache.kafka.common.acl.AclOperation.ALTER; import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS; import static org.apache.kafka.common.acl.AclOperation.CREATE; +import static org.apache.kafka.common.acl.AclOperation.DELETE; import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS; import static org.apache.kafka.common.acl.AclOperation.READ; import static org.apache.kafka.common.acl.AclOperation.WRITE; -import static org.apache.kafka.common.acl.AclOperation.DELETE; -import static org.apache.kafka.common.acl.AclOperation.ALTER; import static org.apache.kafka.common.acl.AclPermissionType.ALLOW; import static org.apache.kafka.common.acl.AclPermissionType.DENY; import static org.apache.kafka.common.resource.PatternType.LITERAL; @@ -250,14 +253,91 @@ public void testSimpleAuthorizations() throws Exception { withId(newBarAcl(ALTER_CONFIGS, ALLOW))); fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl())); barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl())); - assertEquals(Collections.singletonList(ALLOWED), + assertEquals(singletonList(ALLOWED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(), - Collections.singletonList(newAction(READ, TOPIC, "foo_")))); - assertEquals(Collections.singletonList(ALLOWED), + singletonList(newAction(READ, TOPIC, "foo_")))); + assertEquals(singletonList(ALLOWED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "fred")).build(), - Collections.singletonList(newAction(ALTER_CONFIGS, GROUP, "bar")))); + singletonList(newAction(ALTER_CONFIGS, GROUP, "bar")))); + } + + @Test + public void testDenyPrecedenceWithOperationAll() throws Exception { + StandardAuthorizer authorizer = new StandardAuthorizer(); + authorizer.configure(Collections.emptyMap()); + List acls = Arrays.asList( + new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", "*", ALL, DENY), + new StandardAcl(TOPIC, "foo", PREFIXED, "User:alice", "*", READ, ALLOW), + new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, DENY), + new StandardAcl(TOPIC, "foo", PREFIXED, "User:*", "*", DESCRIBE, ALLOW) + ); + + acls.forEach(acl -> { + StandardAclWithId aclWithId = withId(acl); + authorizer.addAcl(aclWithId.id(), aclWithId.acl()); + }); + + assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED), authorizer.authorize( + newRequestContext("alice"), + Arrays.asList( + newAction(WRITE, TOPIC, "foo"), + newAction(READ, TOPIC, "foo"), + newAction(DESCRIBE, TOPIC, "foo"), + newAction(READ, TOPIC, "foobar")))); + + assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED, DENIED), authorizer.authorize( + newRequestContext("bob"), + Arrays.asList( + newAction(DESCRIBE, TOPIC, "foo"), + newAction(READ, TOPIC, "foo"), + newAction(WRITE, TOPIC, "foo"), + newAction(DESCRIBE, TOPIC, "foobaz"), + newAction(READ, TOPIC, "foobaz")))); + } + + @Test + public void testTopicAclWithOperationAll() throws Exception { + StandardAuthorizer authorizer = new StandardAuthorizer(); + authorizer.configure(Collections.emptyMap()); + List acls = Arrays.asList( + new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, ALLOW), + new StandardAcl(TOPIC, "bar", PREFIXED, "User:alice", "*", ALL, ALLOW), + new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW) + ); + + acls.forEach(acl -> { + StandardAclWithId aclWithId = withId(acl); + authorizer.addAcl(aclWithId.id(), aclWithId.acl()); + }); + + assertEquals(Arrays.asList(ALLOWED, ALLOWED, DENIED), authorizer.authorize( + newRequestContext("alice"), + Arrays.asList( + newAction(WRITE, TOPIC, "foo"), + newAction(DESCRIBE_CONFIGS, TOPIC, "bar"), + newAction(DESCRIBE, TOPIC, "baz")))); + + assertEquals(Arrays.asList(ALLOWED, DENIED, ALLOWED), authorizer.authorize( + newRequestContext("bob"), + Arrays.asList( + newAction(WRITE, TOPIC, "foo"), + newAction(READ, TOPIC, "bar"), + newAction(DESCRIBE, TOPIC, "baz")))); + + assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize( + newRequestContext("malory"), + Arrays.asList( + newAction(DESCRIBE, TOPIC, "foo"), + newAction(WRITE, TOPIC, "bar"), + newAction(READ, TOPIC, "baz")))); + } + + private AuthorizableRequestContext newRequestContext(String principal) throws Exception { + return new MockAuthorizableRequestContext.Builder() + .setPrincipal(new KafkaPrincipal(USER_TYPE, principal)) + .build(); } private static StandardAuthorizer createAuthorizerWithManyAcls() {