From 3ee1ef5578bcc235338ddb8c689212264456404f Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 25 Feb 2022 10:21:12 -0800 Subject: [PATCH 1/3] KAFKA-13697; KRaft authorizer should support AclOperation.ALL --- .../kafka/api/AuthorizerIntegrationTest.scala | 49 +++++++++++++++++ .../authorizer/StandardAuthorizerData.java | 13 +++-- .../authorizer/StandardAuthorizerTest.java | 54 +++++++++++++++++-- 3 files changed, 107 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4a08cc67b8ac7..3337d67ec7ac3 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -68,6 +68,8 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import java.util.Collections.singletonList +import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic + import scala.annotation.nowarn import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -2353,6 +2355,51 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeTopicAclWithOperationAll(quorum: String): Unit = { + createTopicWithBrokerPrincipal(topic) + removeAllClientAcls() + + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WildcardHost, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val metadataRequestTopic = new MetadataRequestTopic() + .setName(topic) + + val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(Collections.singletonList(metadataRequestTopic)) + .setAllowAutoTopicCreation(false) + ).build() + + val metadataResponse = connectAndReceive[MetadataResponse](metadataRequest) + val topicResponseOpt = metadataResponse.topicMetadata().asScala.find(_.topic == topic) + assertTrue(topicResponseOpt.isDefined) + + val topicResponse = topicResponseOpt.get + assertEquals(Errors.NONE, topicResponse.error) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testDescribeTopicConfigsAclWithOperationAll(quorum: String): Unit = { + createTopicWithBrokerPrincipal(topic) + removeAllClientAcls() + + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WildcardHost, ALL, ALLOW) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) + + val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData() + .setResources(Collections.singletonList(new DescribeConfigsRequestData.DescribeConfigsResource() + .setResourceType(ConfigResource.Type.TOPIC.id) + .setResourceName(tp.topic))) + ).build() + + val describeConfigsResponse = connectAndReceive[DescribeConfigsResponse](describeConfigsRequest) + val topicConfigResponse = describeConfigsResponse.data.results.get(0) + assertEquals(Errors.NONE, Errors.forCode(topicConfigResponse.errorCode)) + } + private def testMetadataClusterClusterAuthorizedOperations( version: Short, expectedClusterAuthorizedOperations: Int @@ -2528,6 +2575,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { topic: String, numPartitions: Int = 1 ): Unit = { + // Note the principal builder implementation maps all connections on the + // inter-broker listener to the broker principal. createTopic( topic, numPartitions = numPartitions, diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java index 697e84239de69..4b88336017ac5 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; +import static org.apache.kafka.common.acl.AclOperation.ALL; import static org.apache.kafka.common.acl.AclOperation.ALTER; import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS; import static org.apache.kafka.common.acl.AclOperation.DELETE; @@ -342,13 +343,13 @@ void checkSection(Action action, * The set of operations which imply DESCRIBE permission, when used in an ALLOW acl. */ private static final Set IMPLIES_DESCRIBE = Collections.unmodifiableSet( - EnumSet.of(DESCRIBE, READ, WRITE, DELETE, ALTER)); + EnumSet.of(DESCRIBE, READ, WRITE, DELETE, ALTER, ALL)); /** * The set of operations which imply DESCRIBE_CONFIGS permission, when used in an ALLOW acl. */ private static final Set IMPLIES_DESCRIBE_CONFIGS = Collections.unmodifiableSet( - EnumSet.of(DESCRIBE_CONFIGS, ALTER_CONFIGS)); + EnumSet.of(DESCRIBE_CONFIGS, ALTER_CONFIGS, ALL)); /** * Determine what the result of applying an ACL to the given action and request @@ -391,11 +392,13 @@ static AuthorizationResult findResult(Action action, if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) return null; break; default: - if (!action.operation().equals(acl.operation())) return null; + if (acl.operation() != ALL && action.operation() != acl.operation()) { + return null; + } break; } - } else { - if (!action.operation().equals(acl.operation())) return null; + } else if (acl.operation() != ALL && action.operation() != acl.operation()) { + return null; } return acl.permissionType().equals(ALLOW) ? ALLOWED : DENIED; } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java index c56ad45d77fb6..2059370de088a 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.server.authorizer.Action; +import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -41,6 +42,8 @@ import java.util.concurrent.atomic.AtomicLong; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.acl.AclOperation.ALL; import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS; import static org.apache.kafka.common.acl.AclOperation.CREATE; import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; @@ -250,14 +253,57 @@ public void testSimpleAuthorizations() throws Exception { withId(newBarAcl(ALTER_CONFIGS, ALLOW))); fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl())); barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl())); - assertEquals(Collections.singletonList(ALLOWED), + assertEquals(singletonList(ALLOWED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(), - Collections.singletonList(newAction(READ, TOPIC, "foo_")))); - assertEquals(Collections.singletonList(ALLOWED), + singletonList(newAction(READ, TOPIC, "foo_")))); + assertEquals(singletonList(ALLOWED), authorizer.authorize(new MockAuthorizableRequestContext.Builder(). setPrincipal(new KafkaPrincipal(USER_TYPE, "fred")).build(), - Collections.singletonList(newAction(ALTER_CONFIGS, GROUP, "bar")))); + singletonList(newAction(ALTER_CONFIGS, GROUP, "bar")))); + } + + @Test + public void testTopicAclWithOperationAll() throws Exception { + StandardAuthorizer authorizer = new StandardAuthorizer(); + authorizer.configure(Collections.emptyMap()); + List acls = Arrays.asList( + new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, ALLOW), + new StandardAcl(TOPIC, "bar", PREFIXED, "User:alice", "*", ALL, ALLOW), + new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW) + ); + + acls.forEach(acl -> { + StandardAclWithId aclWithId = withId(acl); + authorizer.addAcl(aclWithId.id(), aclWithId.acl()); + }); + + assertEquals(Arrays.asList(ALLOWED, ALLOWED, DENIED), authorizer.authorize( + newRequestContext("alice"), + Arrays.asList( + newAction(WRITE, TOPIC, "foo"), + newAction(DESCRIBE_CONFIGS, TOPIC, "bar"), + newAction(DESCRIBE, TOPIC, "baz")))); + + assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize( + newRequestContext("bob"), + Arrays.asList( + newAction(WRITE, TOPIC, "foo"), + newAction(READ, TOPIC, "bar"), + newAction(DESCRIBE, TOPIC, "baz")))); + + assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize( + newRequestContext("malory"), + Arrays.asList( + newAction(DESCRIBE, TOPIC, "foo"), + newAction(WRITE, TOPIC, "bar"), + newAction(READ, TOPIC, "baz")))); + } + + private AuthorizableRequestContext newRequestContext(String principal) throws Exception { + return new MockAuthorizableRequestContext.Builder() + .setPrincipal(new KafkaPrincipal(USER_TYPE, principal)) + .build(); } private static StandardAuthorizer createAuthorizerWithManyAcls() { From 0496b28219c83cbe9115fedf451644e0a34edf7c Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 25 Feb 2022 10:48:39 -0800 Subject: [PATCH 2/3] Consolidate ALL check and fix broken test --- .../authorizer/StandardAuthorizerData.java | 37 ++++++++++--------- .../authorizer/StandardAuthorizerTest.java | 2 +- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java index 4b88336017ac5..8fee9f5e4066a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java @@ -343,13 +343,13 @@ void checkSection(Action action, * The set of operations which imply DESCRIBE permission, when used in an ALLOW acl. */ private static final Set IMPLIES_DESCRIBE = Collections.unmodifiableSet( - EnumSet.of(DESCRIBE, READ, WRITE, DELETE, ALTER, ALL)); + EnumSet.of(DESCRIBE, READ, WRITE, DELETE, ALTER)); /** * The set of operations which imply DESCRIBE_CONFIGS permission, when used in an ALLOW acl. */ private static final Set IMPLIES_DESCRIBE_CONFIGS = Collections.unmodifiableSet( - EnumSet.of(DESCRIBE_CONFIGS, ALTER_CONFIGS, ALL)); + EnumSet.of(DESCRIBE_CONFIGS, ALTER_CONFIGS)); /** * Determine what the result of applying an ACL to the given action and request @@ -383,23 +383,26 @@ static AuthorizationResult findResult(Action action, // // But this rule only applies to ALLOW ACLs. So for example, a DENY ACL for READ // on a resource does not DENY describe for that resource. - if (acl.permissionType().equals(ALLOW)) { - switch (action.operation()) { - case DESCRIBE: - if (!IMPLIES_DESCRIBE.contains(acl.operation())) return null; - break; - case DESCRIBE_CONFIGS: - if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) return null; - break; - default: - if (acl.operation() != ALL && action.operation() != acl.operation()) { - return null; - } - break; + if (acl.operation() != ALL) { + if (acl.permissionType().equals(ALLOW)) { + switch (action.operation()) { + case DESCRIBE: + if (!IMPLIES_DESCRIBE.contains(acl.operation())) return null; + break; + case DESCRIBE_CONFIGS: + if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) return null; + break; + default: + if (action.operation() != acl.operation()) { + return null; + } + break; + } + } else if (action.operation() != acl.operation()) { + return null; } - } else if (acl.operation() != ALL && action.operation() != acl.operation()) { - return null; } + return acl.permissionType().equals(ALLOW) ? ALLOWED : DENIED; } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java index 2059370de088a..5eac232209833 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java @@ -285,7 +285,7 @@ public void testTopicAclWithOperationAll() throws Exception { newAction(DESCRIBE_CONFIGS, TOPIC, "bar"), newAction(DESCRIBE, TOPIC, "baz")))); - assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize( + assertEquals(Arrays.asList(ALLOWED, DENIED, ALLOWED), authorizer.authorize( newRequestContext("bob"), Arrays.asList( newAction(WRITE, TOPIC, "foo"), From 9867345001f0e3022dee0651554167a8c45c895f Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 25 Feb 2022 11:14:58 -0800 Subject: [PATCH 3/3] Add test case for AclOperation.ALL asserting deny precedence --- .../authorizer/StandardAuthorizerTest.java | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java index 5eac232209833..734a96989c7be 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java @@ -44,14 +44,14 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.apache.kafka.common.acl.AclOperation.ALL; +import static org.apache.kafka.common.acl.AclOperation.ALTER; import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS; import static org.apache.kafka.common.acl.AclOperation.CREATE; +import static org.apache.kafka.common.acl.AclOperation.DELETE; import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS; import static org.apache.kafka.common.acl.AclOperation.READ; import static org.apache.kafka.common.acl.AclOperation.WRITE; -import static org.apache.kafka.common.acl.AclOperation.DELETE; -import static org.apache.kafka.common.acl.AclOperation.ALTER; import static org.apache.kafka.common.acl.AclPermissionType.ALLOW; import static org.apache.kafka.common.acl.AclPermissionType.DENY; import static org.apache.kafka.common.resource.PatternType.LITERAL; @@ -263,6 +263,40 @@ public void testSimpleAuthorizations() throws Exception { singletonList(newAction(ALTER_CONFIGS, GROUP, "bar")))); } + @Test + public void testDenyPrecedenceWithOperationAll() throws Exception { + StandardAuthorizer authorizer = new StandardAuthorizer(); + authorizer.configure(Collections.emptyMap()); + List acls = Arrays.asList( + new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", "*", ALL, DENY), + new StandardAcl(TOPIC, "foo", PREFIXED, "User:alice", "*", READ, ALLOW), + new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, DENY), + new StandardAcl(TOPIC, "foo", PREFIXED, "User:*", "*", DESCRIBE, ALLOW) + ); + + acls.forEach(acl -> { + StandardAclWithId aclWithId = withId(acl); + authorizer.addAcl(aclWithId.id(), aclWithId.acl()); + }); + + assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED), authorizer.authorize( + newRequestContext("alice"), + Arrays.asList( + newAction(WRITE, TOPIC, "foo"), + newAction(READ, TOPIC, "foo"), + newAction(DESCRIBE, TOPIC, "foo"), + newAction(READ, TOPIC, "foobar")))); + + assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED, DENIED), authorizer.authorize( + newRequestContext("bob"), + Arrays.asList( + newAction(DESCRIBE, TOPIC, "foo"), + newAction(READ, TOPIC, "foo"), + newAction(WRITE, TOPIC, "foo"), + newAction(DESCRIBE, TOPIC, "foobaz"), + newAction(READ, TOPIC, "foobaz")))); + } + @Test public void testTopicAclWithOperationAll() throws Exception { StandardAuthorizer authorizer = new StandardAuthorizer();