Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2656,6 +2656,7 @@ CreateAclsRequest.Builder createRequest(int timeoutMs) {

@Override
void handleResponse(AbstractResponse abstractResponse) {
handleNotControllerError(abstractResponse);
CreateAclsResponse response = (CreateAclsResponse) abstractResponse;
List<AclCreationResult> responses = response.results();
Iterator<AclCreationResult> iter = responses.iterator();
Expand Down Expand Up @@ -2708,6 +2709,7 @@ DeleteAclsRequest.Builder createRequest(int timeoutMs) {

@Override
void handleResponse(AbstractResponse abstractResponse) {
handleNotControllerError(abstractResponse);
DeleteAclsResponse response = (DeleteAclsResponse) abstractResponse;
List<DeleteAclsResponseData.DeleteAclsFilterResult> results = response.filterResults();
Iterator<DeleteAclsResponseData.DeleteAclsFilterResult> iter = results.iterator();
Expand Down Expand Up @@ -2926,6 +2928,7 @@ public IncrementalAlterConfigsRequest.Builder createRequest(int timeoutMs) {

@Override
public void handleResponse(AbstractResponse abstractResponse) {
handleNotControllerError(abstractResponse);
IncrementalAlterConfigsResponse response = (IncrementalAlterConfigsResponse) abstractResponse;
Map<ConfigResource, ApiError> errors = IncrementalAlterConfigsResponse.fromResponseData(response.data());
for (Map.Entry<ConfigResource, KafkaFutureImpl<Void>> entry : futures.entrySet()) {
Expand Down Expand Up @@ -4081,8 +4084,11 @@ void handleFailure(Throwable throwable) {
}

private void handleNotControllerError(AbstractResponse response) throws ApiException {
// When sending requests directly to the follower controller, it might return NOT_LEADER_OR_FOLLOWER error.
if (response.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
handleNotControllerError(Errors.NOT_CONTROLLER);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pass NOT_LEADER_OR_FOLLOWER instead of NOT_CONTROLLER when it encounters the error NOT_LEADER_OR_FOLLOWER?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Updated.

} else if (metadataManager.usingBootstrapControllers() && response.errorCounts().containsKey(Errors.NOT_LEADER_OR_FOLLOWER)) {
handleNotControllerError(Errors.NOT_LEADER_OR_FOLLOWER);
}
}

Expand Down Expand Up @@ -4644,6 +4650,7 @@ DescribeQuorumRequest.Builder createRequest(int timeoutMs) {

@Override
void handleResponse(AbstractResponse response) {
handleNotControllerError(response);
final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response;
if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
throw Errors.forCode(quorumResponse.data().errorCode()).exception(quorumResponse.data().errorMessage());
Expand Down Expand Up @@ -4841,6 +4848,7 @@ AddRaftVoterRequest.Builder createRequest(int timeoutMs) {

@Override
void handleResponse(AbstractResponse response) {
handleNotControllerError(response);
AddRaftVoterResponse addResponse = (AddRaftVoterResponse) response;
if (addResponse.data().errorCode() != Errors.NONE.code()) {
ApiError error = new ApiError(
Expand Down Expand Up @@ -4885,6 +4893,7 @@ RemoveRaftVoterRequest.Builder createRequest(int timeoutMs) {

@Override
void handleResponse(AbstractResponse response) {
handleNotControllerError(response);
RemoveRaftVoterResponse addResponse = (RemoveRaftVoterResponse) response;
if (addResponse.data().errorCode() != Errors.NONE.code()) {
ApiError error = new ApiError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> co

AdminMetadataManager metadataManager = new AdminMetadataManager(new LogContext(),
adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG),
config.containsKey(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG));
this.mockClient = new MockClient(time, new MockClient.MockMetadataUpdater() {
@Override
public List<Node> fetchNodes() {
Expand Down
Loading