From 303854428598c99623e8c36ae1d1d8a96b36fb7c Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 12 Jul 2021 18:05:55 +0800 Subject: [PATCH 1/4] KAFKA-13063: refactor DescribeConsumerGroupsHandler and tests --- .../DescribeConsumerGroupsHandler.java | 68 ++++++++++++++----- .../clients/admin/KafkaAdminClientTest.java | 28 ++++---- .../DescribeConsumerGroupsHandlerTest.java | 2 +- 3 files changed, 66 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index 8a94becef1a07..dd9db2b18bcbe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; @@ -103,6 +104,12 @@ public DescribeGroupsRequest.Builder buildRequest(int coordinatorId, Set describedGroups) { + if (describedGroups.isEmpty()) { + throw new InvalidGroupIdException("No consumer group found"); + } + } + @Override public ApiResult handleResponse( Node coordinator, @@ -112,13 +119,17 @@ public ApiResult handleResponse( DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; Map completed = new HashMap<>(); Map failed = new HashMap<>(); - List unmapped = new ArrayList<>(); + final Set groupsToUnmap = new HashSet<>(); + final Set groupsToRetry = new HashSet<>(); + + List describedGroups = response.data().groups(); + validateGroupsNotEmpty(describedGroups); - for (DescribedGroup describedGroup : response.data().groups()) { + for (DescribedGroup describedGroup : describedGroups) { CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId()); Errors error = Errors.forCode(describedGroup.errorCode()); if (error != Errors.NONE) { - handleError(groupIdKey, error, failed, unmapped); + handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry); continue; } final String protocolType = describedGroup.protocolType(); @@ -151,38 +162,59 @@ public ApiResult handleResponse( completed.put(groupIdKey, consumerGroupDescription); } else { failed.put(groupIdKey, new IllegalArgumentException( - String.format("GroupId %s is not a consumer group (%s).", - groupIdKey.idValue, protocolType))); + String.format("GroupId %s is not a consumer group (%s).", + groupIdKey.idValue, protocolType))); } } - return new ApiResult<>(completed, failed, unmapped); + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); + } } private void handleError( CoordinatorKey groupId, Errors error, Map failed, - List unmapped + Set groupsToUnmap, + Set groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `DescribeGroups` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); + groupsToRetry.add(groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("DescribeGroups request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); + groupsToUnmap.add(groupId); break; default: - log.error("Received unexpected error for group {} in `DescribeGroups` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `DescribeGroups` response")); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } @@ -199,4 +231,4 @@ private Set validAclOperations(final int authorizedOperations) { .collect(Collectors.toSet()); } -} +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326ad8955f..68b124c6d65f1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2688,8 +2688,7 @@ public void testDescribeConsumerGroups() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - //Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + // Retriable FindCoordinatorResponse errors should be retried env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -2707,21 +2706,12 @@ public void testDescribeConsumerGroups() throws Exception { Collections.emptySet())); env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); - data = new DescribeGroupsResponseData(); - data.groups().add(DescribeGroupsResponse.groupMetadata( - GROUP_ID, - Errors.COORDINATOR_NOT_AVAILABLE, - "", - "", - "", - Collections.emptyList(), - Collections.emptySet())); - env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); - /* * We need to return two responses here, one with NOT_COORDINATOR error when calling describe consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for COORDINATOR_NOT_AVAILABLE error response */ data = new DescribeGroupsResponseData(); data.groups().add(DescribeGroupsResponse.groupMetadata( @@ -2735,6 +2725,18 @@ public void testDescribeConsumerGroups() throws Exception { env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + data = new DescribeGroupsResponseData(); + data.groups().add(DescribeGroupsResponse.groupMetadata( + GROUP_ID, + Errors.COORDINATOR_NOT_AVAILABLE, + "", + "", + "", + Collections.emptyList(), + Collections.emptySet())); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + data = new DescribeGroupsResponseData(); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java index fe26043613773..aef207aca6a75 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java @@ -104,13 +104,13 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, "")); assertUnmapped(handleWithError(Errors.NOT_COORDINATOR, "")); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS, "")); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, "")); } @Test From e9a186873ac6feacdcc1e1089bd99a0745777db1 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 11:22:42 +0800 Subject: [PATCH 2/4] KAFKA-13063: refactor codes --- .../DescribeConsumerGroupsHandler.java | 29 +++---------------- 1 file changed, 4 insertions(+), 25 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index dd9db2b18bcbe..eb3adf6ec3e7a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -37,7 +37,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AclOperation; -import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; @@ -104,26 +103,19 @@ public DescribeGroupsRequest.Builder buildRequest(int coordinatorId, Set describedGroups) { - if (describedGroups.isEmpty()) { - throw new InvalidGroupIdException("No consumer group found"); - } - } - @Override public ApiResult handleResponse( Node coordinator, Set groupIds, AbstractResponse abstractResponse ) { - DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; - Map completed = new HashMap<>(); - Map failed = new HashMap<>(); + final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; + final Map completed = new HashMap<>(); + final Map failed = new HashMap<>(); final Set groupsToUnmap = new HashSet<>(); final Set groupsToRetry = new HashSet<>(); List describedGroups = response.data().groups(); - validateGroupsNotEmpty(describedGroups); for (DescribedGroup describedGroup : describedGroups) { CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId()); @@ -167,20 +159,7 @@ public ApiResult handleResponse( } } - if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { - return new ApiResult<>( - completed, - failed, - Collections.emptyList() - ); - } else { - // retry the request, so don't send completed/failed results back - return new ApiResult<>( - Collections.emptyMap(), - Collections.emptyMap(), - new ArrayList<>(groupsToUnmap) - ); - } + return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); } private void handleError( From 816bc07a6e0c7d3f8eca9e06803dc91ff031335a Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 18:20:59 +0800 Subject: [PATCH 3/4] KAFKA-13063: refactor --- .../DescribeConsumerGroupsHandler.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index eb3adf6ec3e7a..48a29617f35e6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -171,28 +171,27 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `{}` response", groupId, - apiName(), error.exception()); + log.debug("`DescribeGroups` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`{}` request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", apiName(), groupId); + log.debug("`DescribeGroups` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId.idValue); groupsToRetry.add(groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("`{}` request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", apiName(), groupId, error); + log.debug("`DescribeGroups` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId.idValue, error); groupsToUnmap.add(groupId); break; default: - final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", - groupId, apiName()); - log.error(unexpectedErrorMsg, error.exception()); + final String unexpectedErrorMsg = + String.format("`DescribeGroups` request for group id %s failed due to error %s", groupId.idValue, error); + log.error(unexpectedErrorMsg); failed.put(groupId, error.exception(unexpectedErrorMsg)); } } From 07ef9bffc4ca48aef965ed0558c6503ab45f4e31 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 15 Jul 2021 10:18:07 +0800 Subject: [PATCH 4/4] KAFKA-13063: revert find coordinator response change --- .../DescribeConsumerGroupsHandler.java | 21 +++++++------------ .../clients/admin/KafkaAdminClientTest.java | 1 + 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index 48a29617f35e6..10756a6069fee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -113,15 +113,12 @@ public ApiResult handleResponse( final Map completed = new HashMap<>(); final Map failed = new HashMap<>(); final Set groupsToUnmap = new HashSet<>(); - final Set groupsToRetry = new HashSet<>(); - List describedGroups = response.data().groups(); - - for (DescribedGroup describedGroup : describedGroups) { + for (DescribedGroup describedGroup : response.data().groups()) { CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId()); Errors error = Errors.forCode(describedGroup.errorCode()); if (error != Errors.NONE) { - handleError(groupIdKey, error, failed, groupsToUnmap, groupsToRetry); + handleError(groupIdKey, error, failed, groupsToUnmap); continue; } final String protocolType = describedGroup.protocolType(); @@ -159,15 +156,14 @@ public ApiResult handleResponse( } } - return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); + return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap)); } private void handleError( CoordinatorKey groupId, Errors error, Map failed, - Set groupsToUnmap, - Set groupsToRetry + Set groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: @@ -178,7 +174,6 @@ private void handleError( // If the coordinator is in the middle of loading, then we just need to retry log.debug("`DescribeGroups` request for group id {} failed because the coordinator " + "is still in the process of loading state. Will retry", groupId.idValue); - groupsToRetry.add(groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: @@ -189,10 +184,8 @@ private void handleError( groupsToUnmap.add(groupId); break; default: - final String unexpectedErrorMsg = - String.format("`DescribeGroups` request for group id %s failed due to error %s", groupId.idValue, error); - log.error(unexpectedErrorMsg); - failed.put(groupId, error.exception(unexpectedErrorMsg)); + log.error("`DescribeGroups` request for group id {} failed due to unexpected error {}", groupId.idValue, error); + failed.put(groupId, error.exception()); } } @@ -209,4 +202,4 @@ private Set validAclOperations(final int authorizedOperations) { .collect(Collectors.toSet()); } -} \ No newline at end of file +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 68b124c6d65f1..74e59b92f2306 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2689,6 +2689,7 @@ public void testDescribeConsumerGroups() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Retriable FindCoordinatorResponse errors should be retried + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));