Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
import java.util.Collections.singletonList

import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic

import scala.annotation.nowarn
import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -2353,6 +2355,51 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeTopicAclWithOperationAll(quorum: String): Unit = {
createTopicWithBrokerPrincipal(topic)
removeAllClientAcls()

val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WildcardHost, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)

val metadataRequestTopic = new MetadataRequestTopic()
.setName(topic)

val metadataRequest = new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(Collections.singletonList(metadataRequestTopic))
.setAllowAutoTopicCreation(false)
).build()

val metadataResponse = connectAndReceive[MetadataResponse](metadataRequest)
val topicResponseOpt = metadataResponse.topicMetadata().asScala.find(_.topic == topic)
assertTrue(topicResponseOpt.isDefined)

val topicResponse = topicResponseOpt.get
assertEquals(Errors.NONE, topicResponse.error)
}

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testDescribeTopicConfigsAclWithOperationAll(quorum: String): Unit = {
createTopicWithBrokerPrincipal(topic)
removeAllClientAcls()

val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WildcardHost, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)

val describeConfigsRequest = new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
.setResources(Collections.singletonList(new DescribeConfigsRequestData.DescribeConfigsResource()
.setResourceType(ConfigResource.Type.TOPIC.id)
.setResourceName(tp.topic)))
).build()

val describeConfigsResponse = connectAndReceive[DescribeConfigsResponse](describeConfigsRequest)
val topicConfigResponse = describeConfigsResponse.data.results.get(0)
assertEquals(Errors.NONE, Errors.forCode(topicConfigResponse.errorCode))
}

private def testMetadataClusterClusterAuthorizedOperations(
version: Short,
expectedClusterAuthorizedOperations: Int
Expand Down Expand Up @@ -2528,6 +2575,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
topic: String,
numPartitions: Int = 1
): Unit = {
// Note the principal builder implementation maps all connections on the
// inter-broker listener to the broker principal.
createTopic(
topic,
numPartitions = numPartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;

import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
import static org.apache.kafka.common.acl.AclOperation.DELETE;
Expand Down Expand Up @@ -382,21 +383,26 @@ static AuthorizationResult findResult(Action action,
//
// But this rule only applies to ALLOW ACLs. So for example, a DENY ACL for READ
// on a resource does not DENY describe for that resource.
if (acl.permissionType().equals(ALLOW)) {
switch (action.operation()) {
case DESCRIBE:
if (!IMPLIES_DESCRIBE.contains(acl.operation())) return null;
break;
case DESCRIBE_CONFIGS:
if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) return null;
break;
default:
if (!action.operation().equals(acl.operation())) return null;
break;
if (acl.operation() != ALL) {
if (acl.permissionType().equals(ALLOW)) {
switch (action.operation()) {
case DESCRIBE:
if (!IMPLIES_DESCRIBE.contains(acl.operation())) return null;
break;
case DESCRIBE_CONFIGS:
if (!IMPLIES_DESCRIBE_CONFIGS.contains(acl.operation())) return null;
break;
default:
if (action.operation() != acl.operation()) {
return null;
}
break;
}
} else if (action.operation() != acl.operation()) {
return null;
}
} else {
if (!action.operation().equals(acl.operation())) return null;
}

return acl.permissionType().equals(ALLOW) ? ALLOWED : DENIED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

Expand All @@ -41,14 +42,16 @@
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
import static org.apache.kafka.common.acl.AclOperation.DELETE;
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
import static org.apache.kafka.common.acl.AclOperation.READ;
import static org.apache.kafka.common.acl.AclOperation.WRITE;
import static org.apache.kafka.common.acl.AclOperation.DELETE;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
import static org.apache.kafka.common.acl.AclPermissionType.DENY;
import static org.apache.kafka.common.resource.PatternType.LITERAL;
Expand Down Expand Up @@ -250,14 +253,91 @@ public void testSimpleAuthorizations() throws Exception {
withId(newBarAcl(ALTER_CONFIGS, ALLOW)));
fooAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
barAcls.forEach(a -> authorizer.addAcl(a.id(), a.acl()));
assertEquals(Collections.singletonList(ALLOWED),
assertEquals(singletonList(ALLOWED),
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
Collections.singletonList(newAction(READ, TOPIC, "foo_"))));
assertEquals(Collections.singletonList(ALLOWED),
singletonList(newAction(READ, TOPIC, "foo_"))));
assertEquals(singletonList(ALLOWED),
authorizer.authorize(new MockAuthorizableRequestContext.Builder().
setPrincipal(new KafkaPrincipal(USER_TYPE, "fred")).build(),
Collections.singletonList(newAction(ALTER_CONFIGS, GROUP, "bar"))));
singletonList(newAction(ALTER_CONFIGS, GROUP, "bar"))));
}

@Test
public void testDenyPrecedenceWithOperationAll() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
authorizer.configure(Collections.emptyMap());
List<StandardAcl> acls = Arrays.asList(
new StandardAcl(TOPIC, "foo", LITERAL, "User:alice", "*", ALL, DENY),
new StandardAcl(TOPIC, "foo", PREFIXED, "User:alice", "*", READ, ALLOW),
new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, DENY),
new StandardAcl(TOPIC, "foo", PREFIXED, "User:*", "*", DESCRIBE, ALLOW)
);

acls.forEach(acl -> {
StandardAclWithId aclWithId = withId(acl);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
});

assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED), authorizer.authorize(
newRequestContext("alice"),
Arrays.asList(
newAction(WRITE, TOPIC, "foo"),
newAction(READ, TOPIC, "foo"),
newAction(DESCRIBE, TOPIC, "foo"),
newAction(READ, TOPIC, "foobar"))));

assertEquals(Arrays.asList(DENIED, DENIED, DENIED, ALLOWED, DENIED), authorizer.authorize(
newRequestContext("bob"),
Arrays.asList(
newAction(DESCRIBE, TOPIC, "foo"),
newAction(READ, TOPIC, "foo"),
newAction(WRITE, TOPIC, "foo"),
newAction(DESCRIBE, TOPIC, "foobaz"),
newAction(READ, TOPIC, "foobaz"))));
}

@Test
public void testTopicAclWithOperationAll() throws Exception {
StandardAuthorizer authorizer = new StandardAuthorizer();
authorizer.configure(Collections.emptyMap());
List<StandardAcl> acls = Arrays.asList(
Comment thread
hachikuji marked this conversation as resolved.
new StandardAcl(TOPIC, "foo", LITERAL, "User:*", "*", ALL, ALLOW),
new StandardAcl(TOPIC, "bar", PREFIXED, "User:alice", "*", ALL, ALLOW),
new StandardAcl(TOPIC, "baz", LITERAL, "User:bob", "*", ALL, ALLOW)
);

acls.forEach(acl -> {
StandardAclWithId aclWithId = withId(acl);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
});

assertEquals(Arrays.asList(ALLOWED, ALLOWED, DENIED), authorizer.authorize(
newRequestContext("alice"),
Arrays.asList(
newAction(WRITE, TOPIC, "foo"),
newAction(DESCRIBE_CONFIGS, TOPIC, "bar"),
newAction(DESCRIBE, TOPIC, "baz"))));

assertEquals(Arrays.asList(ALLOWED, DENIED, ALLOWED), authorizer.authorize(
newRequestContext("bob"),
Arrays.asList(
newAction(WRITE, TOPIC, "foo"),
newAction(READ, TOPIC, "bar"),
newAction(DESCRIBE, TOPIC, "baz"))));
Comment thread
hachikuji marked this conversation as resolved.

assertEquals(Arrays.asList(ALLOWED, DENIED, DENIED), authorizer.authorize(
newRequestContext("malory"),
Arrays.asList(
newAction(DESCRIBE, TOPIC, "foo"),
newAction(WRITE, TOPIC, "bar"),
newAction(READ, TOPIC, "baz"))));
}

private AuthorizableRequestContext newRequestContext(String principal) throws Exception {
return new MockAuthorizableRequestContext.Builder()
.setPrincipal(new KafkaPrincipal(USER_TYPE, principal))
.build();
}

private static StandardAuthorizer createAuthorizerWithManyAcls() {
Expand Down