From 38362571dd1bd4661c455feb97d8b43086b8ff1f Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 12 Jul 2021 16:22:46 +0800 Subject: [PATCH 1/3] KAFKA-13062: refactor DeleteConsumerGroupsHandler and tests --- .../DeleteConsumerGroupsHandler.java | 58 ++++++++++++++----- .../clients/admin/KafkaAdminClientTest.java | 40 +++++++------ .../DeleteConsumerGroupsHandlerTest.java | 4 +- 3 files changed, 71 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java index c5d220538129a..e8ff35dcb9329 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java @@ -18,7 +18,9 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -89,46 +91,76 @@ public ApiResult handleResponse( DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse; Map completed = new HashMap<>(); Map failed = new HashMap<>(); - List unmapped = new ArrayList<>(); + final Set groupsToUnmap = new HashSet<>(); + final Set groupsToRetry = new HashSet<>(); for (DeletableGroupResult deletedGroup : response.data().results()) { CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId()); Errors error = Errors.forCode(deletedGroup.errorCode()); if (error != Errors.NONE) { - handleError(groupIdKey, error, failed, unmapped); + handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry); continue; } completed.put(groupIdKey, null); } - return new ApiResult<>(completed, failed, unmapped); + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); + } } private void handleError( CoordinatorKey groupId, Errors error, Map failed, - List unmapped + Set groupsToUnmap, + Set groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); + failed.put(groupId, error.exception()); + break; + case INVALID_GROUP_ID: + case NON_EMPTY_GROUP: + case GROUP_ID_NOT_FOUND: + log.error("Received non retriable failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); + groupsToRetry.add(groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("DeleteConsumerGroups request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); + groupsToUnmap.add(groupId); break; default: - log.error("Received unexpected error for group {} in `DeleteConsumerGroups` response", - groupId, error.exception()); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); failed.put(groupId, error.exception()); } } -} +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326ad8955f..53a3c6ef87925 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3179,41 +3179,47 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); - final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); + DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class); - //Retriable errors should be retried + // Retriable errors should be retried env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - final DeletableGroupResultCollection errorResponse1 = new DeletableGroupResultCollection(); - errorResponse1.add(new DeletableGroupResult() - .setGroupId("groupId") - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) - ); - env.kafkaClient().prepareResponse(new DeleteGroupsResponse( - new DeleteGroupsResponseData() - .setResults(errorResponse1))); - - final DeletableGroupResultCollection errorResponse2 = new DeletableGroupResultCollection(); - errorResponse2.add(new DeletableGroupResult() + final DeletableGroupResultCollection errorResponse = new DeletableGroupResultCollection(); + errorResponse.add(new DeletableGroupResult() .setGroupId("groupId") .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) ); env.kafkaClient().prepareResponse(new DeleteGroupsResponse( new DeleteGroupsResponseData() - .setResults(errorResponse2))); + .setResults(errorResponse))); /* * We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ - final DeletableGroupResultCollection coordinatorMoved = new DeletableGroupResultCollection(); + + DeletableGroupResultCollection coordinatorMoved = new DeletableGroupResultCollection(); coordinatorMoved.add(new DeletableGroupResult() .setGroupId("groupId") .setErrorCode(Errors.NOT_COORDINATOR.code()) ); + + env.kafkaClient().prepareResponse(new DeleteGroupsResponse( + new DeleteGroupsResponseData() + .setResults(coordinatorMoved))); + env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + coordinatorMoved = new DeletableGroupResultCollection(); + coordinatorMoved.add(new DeletableGroupResult() + .setGroupId("groupId") + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + ); + env.kafkaClient().prepareResponse(new DeleteGroupsResponse( new DeleteGroupsResponseData() .setResults(coordinatorMoved))); @@ -3223,9 +3229,9 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { new DeleteGroupsResponseData() .setResults(validResponse))); - final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds); + errorResult = env.adminClient().deleteConsumerGroups(groupIds); - final KafkaFuture errorResults = errorResult1.deletedGroups().get("groupId"); + final KafkaFuture errorResults = errorResult.deletedGroups().get("groupId"); assertNull(errorResults.get()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java index 30a65f4c8fcc6..8d3a2376fde9a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.message.DeleteGroupsResponseData; import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult; @@ -57,12 +58,12 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test @@ -70,6 +71,7 @@ public void testFailedHandleResponse() { assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND)); assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); + assertFailed(GroupNotEmptyException.class, handleWithError(Errors.NON_EMPTY_GROUP)); } private DeleteGroupsResponse buildResponse(Errors error) { From 6b5fc6501b13ff2092b89de91d9c4a1ecca26807 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 11:11:54 +0800 Subject: [PATCH 2/3] KAFKA-13062: refactor code --- .../DeleteConsumerGroupsHandler.java | 50 +++++-------------- 1 file changed, 13 insertions(+), 37 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java index e8ff35dcb9329..06356c75e81bd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java @@ -18,7 +18,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -88,79 +87,56 @@ public ApiResult handleResponse( Set groupIds, AbstractResponse abstractResponse ) { - DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse; - Map completed = new HashMap<>(); - Map failed = new HashMap<>(); + final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse; + final Map completed = new HashMap<>(); + final Map failed = new HashMap<>(); final Set groupsToUnmap = new HashSet<>(); - final Set groupsToRetry = new HashSet<>(); for (DeletableGroupResult deletedGroup : response.data().results()) { CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId()); Errors error = Errors.forCode(deletedGroup.errorCode()); if (error != Errors.NONE) { - handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry); + handleError(groupIdKey, error, failed, groupsToUnmap); continue; } completed.put(groupIdKey, null); } - if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { - return new ApiResult<>( - completed, - failed, - Collections.emptyList() - ); - } else { - // retry the request, so don't send completed/failed results back - return new ApiResult<>( - Collections.emptyMap(), - Collections.emptyMap(), - new ArrayList<>(groupsToUnmap) - ); - } + return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); } private void handleError( CoordinatorKey groupId, Errors error, Map failed, - Set groupsToUnmap, - Set groupsToRetry + Set groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `{}` response", groupId, - apiName(), error.exception()); - failed.put(groupId, error.exception()); - break; case INVALID_GROUP_ID: case NON_EMPTY_GROUP: case GROUP_ID_NOT_FOUND: - log.error("Received non retriable failure for group {} in `{}` response", groupId, - apiName(), error.exception()); + log.debug("`DeleteConsumerGroups` request for group id {} failed due to error {}", groupId, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`{}` request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", apiName(), groupId); - groupsToRetry.add(groupId); + log.debug("`DeleteConsumerGroups` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("`{}` request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", apiName(), groupId, error); + log.debug("`DeleteConsumerGroups` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId, error); groupsToUnmap.add(groupId); break; default: - final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", - groupId, apiName()); - log.error(unexpectedErrorMsg, error.exception()); + log.error("`DeleteConsumerGroups` request for group id {} failed due to unexpected error {}", groupId, error); failed.put(groupId, error.exception()); } } -} \ No newline at end of file +} From 54a364ea4cb35d39e858e1dc876fa19029816da1 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 18:14:44 +0800 Subject: [PATCH 3/3] KAFKA-13062: address comments to refactor --- .../admin/internals/DeleteConsumerGroupsHandler.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java index 06356c75e81bd..2b42001b23c60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java @@ -117,24 +117,24 @@ private void handleError( case INVALID_GROUP_ID: case NON_EMPTY_GROUP: case GROUP_ID_NOT_FOUND: - log.debug("`DeleteConsumerGroups` request for group id {} failed due to error {}", groupId, error); + log.debug("`DeleteConsumerGroups` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`DeleteConsumerGroups` request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", groupId); + log.debug("`DeleteConsumerGroups` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId.idValue); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("`DeleteConsumerGroups` request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", groupId, error); + log.debug("`DeleteConsumerGroups` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId.idValue, error); groupsToUnmap.add(groupId); break; default: - log.error("`DeleteConsumerGroups` request for group id {} failed due to unexpected error {}", groupId, error); + log.error("`DeleteConsumerGroups` request for group id {} failed due to unexpected error {}", groupId.idValue, error); failed.put(groupId, error.exception()); } }