diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index 240516d4ccdfe..bfcf1ab69b63f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -69,8 +70,18 @@ public AdminApiLookupStrategy lookupStrategy() { return lookupStrategy; } + private void validateKeys( + Set groupIds + ) { + if (!groupIds.equals(Collections.singleton(groupId))) { + throw new IllegalArgumentException("Received unexpected group ids " + groupIds + + " (expected only " + Collections.singleton(groupId) + ")"); + } + } + @Override - public OffsetFetchRequest.Builder buildRequest(int coordinatorId, Set keys) { + public OffsetFetchRequest.Builder buildRequest(int coordinatorId, Set groupIds) { + validateKeys(groupIds); // Set the flag to false as for admin client request, // we don't need to wait for any pending offset state to clear. return new OffsetFetchRequest.Builder(groupId.idValue, false, partitions, false); @@ -82,14 +93,19 @@ public ApiResult> handleR Set groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; - Map> completed = new HashMap<>(); - Map failed = new HashMap<>(); - List unmapped = new ArrayList<>(); - Errors responseError = response.groupLevelError(groupId.idValue); - if (responseError != Errors.NONE) { - handleError(groupId, responseError, failed, unmapped); + // the groupError will contain the group level error for v0-v8 OffsetFetchResponse + Errors groupError = response.groupLevelError(groupId.idValue); + if (groupError != Errors.NONE) { + final Map failed = new HashMap<>(); + final Set groupsToUnmap = new HashSet<>(); + + handleGroupError(groupId, groupError, failed, groupsToUnmap); + + return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap)); } else { final Map groupOffsetsListing = new HashMap<>(); Map partitionDataMap = @@ -113,38 +129,44 @@ public ApiResult> handleR log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); } } - completed.put(groupId, groupOffsetsListing); + + return new ApiResult<>( + Collections.singletonMap(groupId, groupOffsetsListing), + Collections.emptyMap(), + Collections.emptyList() + ); } - return new ApiResult<>(completed, failed, unmapped); } - private void handleError( + private void handleGroupError( CoordinatorKey groupId, Errors error, - Map failed, - List unmapped + Map failed, + Set groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId, - error.exception()); + log.debug("`OffsetFetch` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`OffsetFetch` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId.idValue); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("OffsetFetch request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`OffsetFetch` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId.idValue, error); + groupsToUnmap.add(groupId); break; + default: - log.error("Received unexpected error for group {} in `OffsetFetch` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `OffsetFetch` response")); + log.error("`OffsetFetch` request for group id {} failed due to unexpected error {}", groupId.idValue, error); + failed.put(groupId, error.exception()); } } - } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326ad8955f..977b0374b8e51 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2960,16 +2960,14 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); - env.kafkaClient().prepareResponse( new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); - /* * We need to return two responses here, one for NOT_COORDINATOR call when calling list consumer offsets * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); @@ -2977,6 +2975,12 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( new OffsetFetchResponse(Errors.NONE, Collections.emptyMap())); @@ -3020,17 +3024,21 @@ public void testListConsumerGroupOffsets() throws Exception { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); // Retriable errors should be retried - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); /* * We need to return two responses here, one for NOT_COORDINATOR error when calling list consumer group offsets * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java index b461ea3b23d41..9c9bb1e58adb5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java @@ -24,9 +24,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; @@ -67,15 +69,26 @@ public void testSuccessfulHandleResponse() { assertCompleted(handleWithError(Errors.NONE), expected); } + + @Test + public void testSuccessfulHandleResponseWithOnePartitionError() { + Map expectedResult = Collections.singletonMap(t0p0, new OffsetAndMetadata(10L)); + + // expected that there's only 1 partition result returned because the other partition is skipped with error + assertCompleted(handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION), expectedResult); + assertCompleted(handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED), expectedResult); + assertCompleted(handleWithPartitionError(Errors.UNSTABLE_OFFSET_COMMIT), expectedResult); + } + @Test public void testUnmappedHandleResponse() { + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test @@ -91,6 +104,24 @@ private OffsetFetchResponse buildResponse(Errors error) { return response; } + private OffsetFetchResponse buildResponseWithPartitionError(Errors error) { + + Map responseData = new HashMap<>(); + responseData.put(t0p0, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", Errors.NONE)); + responseData.put(t0p1, new OffsetFetchResponse.PartitionData(10, Optional.empty(), "", error)); + + OffsetFetchResponse response = new OffsetFetchResponse(Errors.NONE, responseData); + return response; + } + + private AdminApiHandler.ApiResult> handleWithPartitionError( + Errors error + ) { + ListConsumerGroupOffsetsHandler handler = new ListConsumerGroupOffsetsHandler(groupId, tps, logContext); + OffsetFetchResponse response = buildResponseWithPartitionError(error); + return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + } + private AdminApiHandler.ApiResult> handleWithError( Errors error ) {