From 6911a4c12eb243c83911dfb5cb1475475d0b32a9 Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Thu, 30 May 2024 15:16:56 +0100 Subject: [PATCH 1/4] KAFKA-16865: Add IncludeTopicAuthorizedOperations option for DescribeTopicPartitionsRequest --- .../org/apache/kafka/clients/admin/KafkaAdminClient.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 71d39900cd5a8..9ac60c01224c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2241,7 +2241,7 @@ void handleResponse(AbstractResponse abstractResponse) { continue; } - TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes, options.includeAuthorizedOperations()); if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { // Add the partitions for the cursor topic of the previous batch. @@ -2404,14 +2404,16 @@ void handleFailure(Throwable throwable) { private TopicDescription getTopicDescriptionFromDescribeTopicsResponseTopic( DescribeTopicPartitionsResponseTopic topic, - Map nodes + Map nodes, + boolean includeAuthorizedOperations ) { List partitionInfos = topic.partitions(); List partitions = new ArrayList<>(partitionInfos.size()); for (DescribeTopicPartitionsResponsePartition partitionInfo : partitionInfos) { partitions.add(DescribeTopicPartitionsResponse.partitionToTopicPartitionInfo(partitionInfo, nodes)); } - return new TopicDescription(topic.name(), topic.isInternal(), partitions, validAclOperations(topic.topicAuthorizedOperations()), topic.topicId()); + Set authorisedOperations = includeAuthorizedOperations ? validAclOperations(topic.topicAuthorizedOperations()) : null; + return new TopicDescription(topic.name(), topic.isInternal(), partitions, authorisedOperations, topic.topicId()); } private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, String topicName, Uuid topicId, From 3835aaddf8ef721c3d39e8f0be4acb462fb251a4 Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Tue, 11 Jun 2024 11:00:59 +0100 Subject: [PATCH 2/4] Add unit test --- .../clients/admin/KafkaAdminClientTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index ea1305e533bef..c3fefe8294a04 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -1464,6 +1464,7 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiBasic() { assertEquals(0, topicDescription.partitions().get(0).partition()); assertEquals(1, topicDescription.partitions().get(1).partition()); topicDescription = topicDescriptions.get(topicName1); + assertNull(topicDescription.authorizedOperations()); assertEquals(1, topicDescription.partitions().size()); } catch (Exception e) { fail("describe using DescribeTopics API should not fail", e); @@ -1471,6 +1472,46 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiBasic() { } } + @Test + public void testDescribeTopicPartitionsApiWithAuthorizedOps() { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String topicName0 = "test-0"; + Uuid topicId = Uuid.randomUuid(); + + int authorisedOperations = 1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code(); + env.kafkaClient().prepareResponse( + prepareDescribeClusterResponse(0, + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + 2, + authorisedOperations) + ); + + DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData(); + responseData.topics().add(new DescribeTopicPartitionsResponseTopic() + .setErrorCode((short) 0) + .setTopicId(topicId) + .setName(topicName0) + .setIsInternal(false) + .setTopicAuthorizedOperations(authorisedOperations)); + env.kafkaClient().prepareResponse(new DescribeTopicPartitionsResponse(responseData)); + + try { + DescribeTopicsResult result = env.adminClient().describeTopics( + Arrays.asList(topicName0), new DescribeTopicsOptions().includeAuthorizedOperations(true) + ); + + Map topicDescriptions = result.allTopicNames().get(); + TopicDescription topicDescription = topicDescriptions.get(topicName0); + assertEquals(new HashSet<>(Arrays.asList(AclOperation.DESCRIBE, AclOperation.ALTER)), + topicDescription.authorizedOperations()); + } catch (Exception e) { + fail("describe using DescribeTopics API should not fail", e); + } + } + } + @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"}) @Test public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() { @@ -1547,6 +1588,7 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() { assertEquals(2, topicDescription.partitions().size()); topicDescription = topicDescriptions.get(topicName2); assertEquals(2, topicDescription.partitions().size()); + assertNull(topicDescription.authorizedOperations()); } catch (Exception e) { fail("describe using DescribeTopics API should not fail", e); } From c4bc52b114cee187fb87401b8df549201bdd2fba Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Tue, 11 Jun 2024 13:53:21 +0100 Subject: [PATCH 3/4] Address comments on unit tests --- .../clients/admin/KafkaAdminClientTest.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index c3fefe8294a04..80027607abeb3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -1473,13 +1473,13 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiBasic() { } @Test - public void testDescribeTopicPartitionsApiWithAuthorizedOps() { + public void testDescribeTopicPartitionsApiWithAuthorizedOps() throws ExecutionException, InterruptedException { try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); String topicName0 = "test-0"; Uuid topicId = Uuid.randomUuid(); - int authorisedOperations = 1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code(); + int authorisedOperations = Utils.to32BitField(Utils.mkSet(AclOperation.DESCRIBE.code(), AclOperation.ALTER.code())); env.kafkaClient().prepareResponse( prepareDescribeClusterResponse(0, env.cluster().nodes(), @@ -1497,18 +1497,14 @@ public void testDescribeTopicPartitionsApiWithAuthorizedOps() { .setTopicAuthorizedOperations(authorisedOperations)); env.kafkaClient().prepareResponse(new DescribeTopicPartitionsResponse(responseData)); - try { - DescribeTopicsResult result = env.adminClient().describeTopics( - Arrays.asList(topicName0), new DescribeTopicsOptions().includeAuthorizedOperations(true) - ); + DescribeTopicsResult result = env.adminClient().describeTopics( + singletonList(topicName0), new DescribeTopicsOptions().includeAuthorizedOperations(true) + ); - Map topicDescriptions = result.allTopicNames().get(); - TopicDescription topicDescription = topicDescriptions.get(topicName0); - assertEquals(new HashSet<>(Arrays.asList(AclOperation.DESCRIBE, AclOperation.ALTER)), - topicDescription.authorizedOperations()); - } catch (Exception e) { - fail("describe using DescribeTopics API should not fail", e); - } + Map topicDescriptions = result.allTopicNames().get(); + TopicDescription topicDescription = topicDescriptions.get(topicName0); + assertEquals(new HashSet<>(asList(AclOperation.DESCRIBE, AclOperation.ALTER)), + topicDescription.authorizedOperations()); } } From 2b3537aa57ac029f432a7ac538e8148e7562e41c Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Wed, 12 Jun 2024 11:36:04 +0100 Subject: [PATCH 4/4] Address review comment for adding an unit test for filtering out auth options --- .../clients/admin/KafkaAdminClientTest.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 80027607abeb3..e04de635f7139 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -1508,6 +1508,41 @@ public void testDescribeTopicPartitionsApiWithAuthorizedOps() throws ExecutionEx } } + @Test + public void testDescribeTopicPartitionsApiWithoutAuthorizedOps() throws ExecutionException, InterruptedException { + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String topicName0 = "test-0"; + Uuid topicId = Uuid.randomUuid(); + + int authorisedOperations = Utils.to32BitField(Utils.mkSet(AclOperation.DESCRIBE.code(), AclOperation.ALTER.code())); + env.kafkaClient().prepareResponse( + prepareDescribeClusterResponse(0, + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + 2, + authorisedOperations) + ); + + DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData(); + responseData.topics().add(new DescribeTopicPartitionsResponseTopic() + .setErrorCode((short) 0) + .setTopicId(topicId) + .setName(topicName0) + .setIsInternal(false) + .setTopicAuthorizedOperations(authorisedOperations)); + env.kafkaClient().prepareResponse(new DescribeTopicPartitionsResponse(responseData)); + + DescribeTopicsResult result = env.adminClient().describeTopics( + singletonList(topicName0), new DescribeTopicsOptions().includeAuthorizedOperations(false) + ); + + Map topicDescriptions = result.allTopicNames().get(); + TopicDescription topicDescription = topicDescriptions.get(topicName0); + assertNull(topicDescription.authorizedOperations()); + } + } + @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"}) @Test public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() {