diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index b7e482720b00e..a2f8b0a55ef1f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2656,6 +2656,7 @@ CreateAclsRequest.Builder createRequest(int timeoutMs) { @Override void handleResponse(AbstractResponse abstractResponse) { + handleNotControllerError(abstractResponse); CreateAclsResponse response = (CreateAclsResponse) abstractResponse; List responses = response.results(); Iterator iter = responses.iterator(); @@ -2708,6 +2709,7 @@ DeleteAclsRequest.Builder createRequest(int timeoutMs) { @Override void handleResponse(AbstractResponse abstractResponse) { + handleNotControllerError(abstractResponse); DeleteAclsResponse response = (DeleteAclsResponse) abstractResponse; List results = response.filterResults(); Iterator iter = results.iterator(); @@ -2926,6 +2928,7 @@ public IncrementalAlterConfigsRequest.Builder createRequest(int timeoutMs) { @Override public void handleResponse(AbstractResponse abstractResponse) { + handleNotControllerError(abstractResponse); IncrementalAlterConfigsResponse response = (IncrementalAlterConfigsResponse) abstractResponse; Map errors = IncrementalAlterConfigsResponse.fromResponseData(response.data()); for (Map.Entry> entry : futures.entrySet()) { @@ -4081,8 +4084,11 @@ void handleFailure(Throwable throwable) { } private void handleNotControllerError(AbstractResponse response) throws ApiException { + // When sending requests directly to the follower controller, it might return NOT_LEADER_OR_FOLLOWER error. if (response.errorCounts().containsKey(Errors.NOT_CONTROLLER)) { handleNotControllerError(Errors.NOT_CONTROLLER); + } else if (metadataManager.usingBootstrapControllers() && response.errorCounts().containsKey(Errors.NOT_LEADER_OR_FOLLOWER)) { + handleNotControllerError(Errors.NOT_LEADER_OR_FOLLOWER); } } @@ -4644,6 +4650,7 @@ DescribeQuorumRequest.Builder createRequest(int timeoutMs) { @Override void handleResponse(AbstractResponse response) { + handleNotControllerError(response); final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response; if (quorumResponse.data().errorCode() != Errors.NONE.code()) { throw Errors.forCode(quorumResponse.data().errorCode()).exception(quorumResponse.data().errorMessage()); @@ -4841,6 +4848,7 @@ AddRaftVoterRequest.Builder createRequest(int timeoutMs) { @Override void handleResponse(AbstractResponse response) { + handleNotControllerError(response); AddRaftVoterResponse addResponse = (AddRaftVoterResponse) response; if (addResponse.data().errorCode() != Errors.NONE.code()) { ApiError error = new ApiError( @@ -4885,6 +4893,7 @@ RemoveRaftVoterRequest.Builder createRequest(int timeoutMs) { @Override void handleResponse(AbstractResponse response) { + handleNotControllerError(response); RemoveRaftVoterResponse addResponse = (RemoveRaftVoterResponse) response; if (addResponse.data().errorCode() != Errors.NONE.code()) { ApiError error = new ApiError( diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java index 99c823e78f487..add2256b2b3cb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java @@ -73,7 +73,8 @@ public AdminClientUnitTestEnv(Time time, Cluster cluster, Map co AdminMetadataManager metadataManager = new AdminMetadataManager(new LogContext(), adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), - adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false); + adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), + config.containsKey(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG)); this.mockClient = new MockClient(time, new MockClient.MockMetadataUpdater() { @Override public List fetchNodes() { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 1d1ae3e884b49..f331c593816df 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -1505,7 +1505,8 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiBasic() throws Execu env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 2, - MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED, + false) ); DescribeTopicPartitionsResponseData dataFirstPart = new DescribeTopicPartitionsResponseData(); @@ -1562,7 +1563,8 @@ public void testDescribeTopicPartitionsApiWithAuthorizedOps() throws ExecutionEx env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 2, - authorisedOperations) + authorisedOperations, + false) ); DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData(); @@ -1598,7 +1600,8 @@ public void testDescribeTopicPartitionsApiWithoutAuthorizedOps() throws Executio env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 2, - authorisedOperations) + authorisedOperations, + false) ); DescribeTopicPartitionsResponseData responseData = new DescribeTopicPartitionsResponseData(); @@ -1638,7 +1641,8 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() throws Ex env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 2, - MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED, + false) ); DescribeTopicPartitionsResponseData dataFirstPart = new DescribeTopicPartitionsResponseData(); @@ -1738,7 +1742,8 @@ public void testDescribeTopicsWithDescribeTopicPartitionsApiErrorHandling() thro env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 2, - MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED, + false) ); DescribeTopicPartitionsResponseData dataFirstPart = new DescribeTopicPartitionsResponseData(); @@ -1878,6 +1883,71 @@ public void testDescribeAcls() throws Exception { } } + @Test + public void testCreateAclsToController() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + + env.kafkaClient().prepareResponse(new CreateAclsResponse(new CreateAclsResponseData().setResults(asList( + new CreateAclsResponseData.AclCreationResult() + .setErrorCode(Errors.NOT_CONTROLLER.code()) + .setErrorMessage("not controller"))))); + // should retry the describe cluster to update the metadata + env.kafkaClient().prepareResponse( + prepareDescribeClusterResponse(0, + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + 2, + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED, + true) + ); + + // Test a call where we successfully create two ACLs. + env.kafkaClient().prepareResponse(new CreateAclsResponse(new CreateAclsResponseData().setResults(asList( + new CreateAclsResponseData.AclCreationResult())))); + + CreateAclsResult results = env.adminClient().createAcls(asList(ACL1)); + assertCollectionIs(results.values().keySet(), ACL1); + for (KafkaFuture future : results.values().values()) + future.get(); + results.all().get(); + } + } + + @Test + public void testDeleteAclsToController() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData() + .setThrottleTimeMs(0) + .setFilterResults(asList(new DeleteAclsResponseData.DeleteAclsFilterResult() + .setErrorCode(Errors.NOT_CONTROLLER.code()) + .setErrorMessage("not controller"))), + ApiKeys.DELETE_ACLS.latestVersion())); + // should retry the describe cluster to update the metadata + env.kafkaClient().prepareResponse( + prepareDescribeClusterResponse(0, + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + 2, + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED, + true) + ); + // Test a call where there are no errors. + env.kafkaClient().prepareResponse(new DeleteAclsResponse(new DeleteAclsResponseData() + .setThrottleTimeMs(0) + .setFilterResults(asList( + new DeleteAclsResponseData.DeleteAclsFilterResult() + .setMatchingAcls(singletonList(DeleteAclsResponse.matchingAcl(ACL1, ApiError.NONE))))), + ApiKeys.DELETE_ACLS.latestVersion())); + DeleteAclsResult results = env.adminClient().deleteAcls(asList(FILTER1)); + Collection deleted = results.all().get(); + assertCollectionIs(deleted, ACL1); + } + } + @Test public void testCreateAcls() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { @@ -2801,7 +2871,8 @@ public void testDescribeCluster() throws Exception { env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 2, - MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)); + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED, + false)); // Prepare the describe cluster response used for the second describe cluster env.kafkaClient().prepareResponse( @@ -2809,7 +2880,8 @@ public void testDescribeCluster() throws Exception { env.cluster().nodes(), env.cluster().clusterResource().clusterId(), 3, - 1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code())); + 1 << AclOperation.DESCRIBE.code() | 1 << AclOperation.ALTER.code(), + false)); // Test DescribeCluster with the authorized operations omitted. final DescribeClusterResult result = env.adminClient().describeCluster(); @@ -2858,7 +2930,8 @@ private static DescribeClusterResponse prepareDescribeClusterResponse( Collection brokers, String clusterId, int controllerId, - int clusterAuthorizedOperations + int clusterAuthorizedOperations, + boolean sentToController ) { DescribeClusterResponseData data = new DescribeClusterResponseData() .setErrorCode(Errors.NONE.code()) @@ -2867,6 +2940,10 @@ private static DescribeClusterResponse prepareDescribeClusterResponse( .setClusterId(clusterId) .setClusterAuthorizedOperations(clusterAuthorizedOperations); + if (sentToController) { + data.setEndpointType(EndpointType.CONTROLLER.id()); + } + brokers.forEach(broker -> data.brokers().add(new DescribeClusterBroker() .setHost(broker.host()) @@ -5678,6 +5755,51 @@ public void testIncrementalAlterConfigs() throws Exception { } } + @Test + public void testIncrementalAlterConfigsToController() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + //test NOT_CONTROLLER error scenarios + IncrementalAlterConfigsResponseData responseData = new IncrementalAlterConfigsResponseData(); + responseData.responses().add(new AlterConfigsResourceResponse() + .setResourceName("") + .setResourceType(ConfigResource.Type.BROKER.id()) + .setErrorCode(Errors.NOT_CONTROLLER.code()) + .setErrorMessage("not controller")); + + env.kafkaClient().prepareResponse(new IncrementalAlterConfigsResponse(responseData)); + + // should retry the describe cluster to update the metadata + env.kafkaClient().prepareResponse( + prepareDescribeClusterResponse(0, + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + 2, + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED, + true) + ); + + IncrementalAlterConfigsResponseData responseData2 = new IncrementalAlterConfigsResponseData(); + responseData2.responses().add(new AlterConfigsResourceResponse() + .setResourceName("") + .setResourceType(ConfigResource.Type.BROKER.id()) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(ApiError.NONE.message())); + + ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + + AlterConfigOp alterConfigOp1 = new AlterConfigOp( + new ConfigEntry("log.segment.bytes", "1073741"), + AlterConfigOp.OpType.SET); + + final Map> configs = new HashMap<>(); + configs.put(brokerResource, singletonList(alterConfigOp1)); + env.kafkaClient().prepareResponse(new IncrementalAlterConfigsResponse(responseData2)); + env.adminClient().incrementalAlterConfigs(configs).all().get(); + } + } + @Test public void testRemoveMembersFromGroupNumRetries() throws Exception { final Cluster cluster = mockCluster(3, 0); @@ -8889,7 +9011,7 @@ boolean callHasExpired(KafkaAdminClient.Call call) { @ParameterizedTest @CsvSource({ "false, false", "false, true", "true, false", "true, true" }) public void testAddRaftVoterRequest(boolean fail, boolean sendClusterId) throws Exception { - try (AdminClientUnitTestEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) { AddRaftVoterResponseData responseData = new AddRaftVoterResponseData(); if (fail) { responseData. @@ -8930,13 +9052,52 @@ public void testAddRaftVoterRequest(boolean fail, boolean sendClusterId) throws setName("CONTROLLER"). setHost("example.com"). setPort(8080), requestData.get().listeners().find("CONTROLLER")); + + // In the fail case, we continue to test the `NOT_LEADER_OR_FOLLOWER` error case + if (fail && !sendClusterId) { + responseData. + setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()). + setErrorMessage("test"); + env.kafkaClient().prepareResponse( + request -> { + if (!(request instanceof AddRaftVoterRequest)) return false; + requestData.set((AddRaftVoterRequestData) request.data()); + return true; + }, + new AddRaftVoterResponse(responseData)); + + // should retry the describe cluster to update the metadata + env.kafkaClient().prepareResponse( + prepareDescribeClusterResponse(0, + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + 2, + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED, + true) + ); + + AddRaftVoterResponseData responseData2 = new AddRaftVoterResponseData(); + env.kafkaClient().prepareResponse( + request -> { + if (!(request instanceof AddRaftVoterRequest)) return false; + requestData.set((AddRaftVoterRequestData) request.data()); + return true; + }, + new AddRaftVoterResponse(responseData2)); + + AddRaftVoterResult result2 = env.adminClient().addRaftVoter(1, + Uuid.fromString("YAfa4HClT3SIIW2klIUspg"), + Collections.singleton(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)), + options); + result2.all().get(); + } } } @ParameterizedTest @CsvSource({ "false, false", "false, true", "true, false", "true, true" }) public void testRemoveRaftVoterRequest(boolean fail, boolean sendClusterId) throws Exception { - try (AdminClientUnitTestEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, "dummy")) { RemoveRaftVoterResponseData responseData = new RemoveRaftVoterResponseData(); if (fail) { responseData. @@ -8971,6 +9132,44 @@ public void testRemoveRaftVoterRequest(boolean fail, boolean sendClusterId) thro } assertEquals(1, requestData.get().voterId()); assertEquals(Uuid.fromString("YAfa4HClT3SIIW2klIUspg"), requestData.get().voterDirectoryId()); + + // In the fail case, we continue to test the `NOT_LEADER_OR_FOLLOWER` error case + if (fail && !sendClusterId) { + responseData. + setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()). + setErrorMessage("test"); + env.kafkaClient().prepareResponse( + request -> { + if (!(request instanceof RemoveRaftVoterRequest)) return false; + requestData.set((RemoveRaftVoterRequestData) request.data()); + return true; + }, + new RemoveRaftVoterResponse(responseData)); + + // should retry the describe cluster to update the metadata + env.kafkaClient().prepareResponse( + prepareDescribeClusterResponse(0, + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + 2, + MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED, + true) + ); + + RemoveRaftVoterResponseData responseData2 = new RemoveRaftVoterResponseData(); + env.kafkaClient().prepareResponse( + request -> { + if (!(request instanceof RemoveRaftVoterRequest)) return false; + requestData.set((RemoveRaftVoterRequestData) request.data()); + return true; + }, + new RemoveRaftVoterResponse(responseData2)); + + RemoveRaftVoterResult result2 = env.adminClient().removeRaftVoter(1, + Uuid.fromString("YAfa4HClT3SIIW2klIUspg"), + options); + result2.all().get(); + } } } } diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 1d3021b0bbcf7..8b24043952c77 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -876,9 +876,12 @@ class KRaftClusterTest { } } - def createAdminClient(cluster: KafkaClusterTestKit): Admin = { + def createAdminClient(cluster: KafkaClusterTestKit, bootstrapController: Boolean): Admin = { var props: Properties = null - props = cluster.clientProperties() + props = if (bootstrapController) + cluster.newClientPropertiesBuilder().setUsingBootstrapControllers(true).build() + else + cluster.clientProperties() props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName) Admin.create(props) } @@ -896,7 +899,7 @@ class KRaftClusterTest { TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING, "Broker Never started up") } - val admin = createAdminClient(cluster) + val admin = createAdminClient(cluster, bootstrapController = false) try { val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions) val quorumInfo = quorumState.quorumInfo.get() @@ -940,6 +943,47 @@ class KRaftClusterTest { } } + @Test + def testDescribeQuorumRequestToControllers() : Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(4). + setNumControllerNodes(3).build()).build() + try { + cluster.format() + cluster.startup() + for (i <- 0 to 3) { + TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING, + "Broker Never started up") + } + val admin = createAdminClient(cluster, bootstrapController = true) + try { + val quorumInfo = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions).quorumInfo.get() + + assertEquals(cluster.controllers.asScala.keySet, quorumInfo.voters.asScala.map(_.replicaId).toSet) + assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId), + s"Leader ID ${quorumInfo.leaderId} was not a controller ID.") + + // Try to bring down the raft client in the active controller node to force the leader election. + cluster.controllers().get(quorumInfo.leaderId).sharedServer.raftManager.client.shutdown(1000) + // Send another describe metadata quorum request, it'll get NOT_LEADER_OR_FOLLOWER error first and then re-retrieve the metadata update + // and send to the correct active controller. + val quorumInfo2 = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions) + .quorumInfo().get() + // Make sure the leader has changed + assertTrue(quorumInfo.leaderId() != quorumInfo2.leaderId()) + + assertEquals(cluster.controllers.asScala.keySet, quorumInfo.voters.asScala.map(_.replicaId).toSet) + assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId), + s"Leader ID ${quorumInfo.leaderId} was not a controller ID.") + } finally { + admin.close() + } + } finally { + cluster.close() + } + } + @Test def testUpdateMetadataVersion(): Unit = { val cluster = new KafkaClusterTestKit.Builder(