From 4fd9408164c766cc6d6acbca959a3808f8f5221f Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Sun, 4 Jul 2021 17:57:22 +0800 Subject: [PATCH 1/5] KAFKA-13033: COORDINATOR_NOT_AVAILABLE should be unmapped --- .../AlterConsumerGroupOffsetsHandler.java | 9 ++++++++- .../DeleteConsumerGroupOffsetsHandler.java | 11 ++++++++--- .../internals/DeleteConsumerGroupsHandler.java | 11 ++++++++--- .../DescribeConsumerGroupsHandler.java | 11 ++++++++--- .../ListConsumerGroupOffsetsHandler.java | 13 ++++++++++--- .../RemoveMembersFromConsumerGroupHandler.java | 13 ++++++++++--- .../AlterConsumerGroupOffsetsHandlerTest.java | 17 +++++++++++++++-- .../DeleteConsumerGroupOffsetsHandlerTest.java | 2 +- .../DeleteConsumerGroupsHandlerTest.java | 2 +- .../DescribeConsumerGroupsHandlerTest.java | 2 +- .../ListConsumerGroupOffsetsHandlerTest.java | 2 +- ...moveMembersFromConsumerGroupHandlerTest.java | 2 +- 12 files changed, 72 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java index cd99b54c72a80..4c4c743915d97 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java @@ -141,9 +141,16 @@ private void handleError( failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("OffsetCommit request for group {} failed because the coordinator" + + " is still in the process of loading state. Will retry. Will retry", groupId); + break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("OffsetCommit request for group {} returned error {}. Will retry", groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("OffsetCommit request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", groupId, error); unmapped.add(groupId); break; default: diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index 7e8b549b323c2..a93a0486006f0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -136,11 +136,16 @@ private boolean handleError( failed.put(groupId, error.exception()); return true; case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("DeleteConsumerGroupOffsets request for group {} failed because the coordinator" + + " is still in the process of loading state. Will retry. Will retry", groupId); return true; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("DeleteConsumerGroupOffsets request for group {} returned error {}. Will retry", - groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("DeleteConsumerGroupOffsets request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", groupId, error); unmapped.add(groupId); return true; default: diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java index c5d220538129a..0e1bbff6ad04c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java @@ -117,11 +117,16 @@ private void handleError( failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("DeleteConsumerGroups request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("DeleteConsumerGroups request for group {} returned error {}. Will retry", - groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("DeleteConsumerGroups request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId, error); unmapped.add(groupId); break; default: diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index 8a94becef1a07..72439d5ff1c10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -171,11 +171,16 @@ private void handleError( failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("DeleteConsumerGroups request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("DescribeGroups request for group {} returned error {}. Will retry", - groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("DescribeGroups request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId, error); unmapped.add(groupId); break; default: diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index 4439bc3b02d76..a8c0ee29e4244 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -129,14 +129,21 @@ private void handleError( error.exception()); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("OffsetFetch request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("OffsetFetch request for group {} returned error {}. Will retry", - groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("OffsetFetch request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId, error); unmapped.add(groupId); break; + default: log.error("Received unexpected error for group {} in `OffsetFetch` response", groupId, error.exception()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java index c6af2d4a3db7b..edd7798b91c57 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java @@ -113,14 +113,21 @@ private void handleError( error.exception()); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("LeaveGroup request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("LeaveGroup request for group {} returned error {}. Will retry", - groupId, error); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("LeaveGroup request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId, error); unmapped.add(groupId); break; + default: log.error("Received unexpected error for group {} in `LeaveGroup` response", groupId, error.exception()); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java index c20107f67aa74..b211379497611 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; @@ -81,12 +82,16 @@ public void testSuccessfulHandleResponse() { } @Test - public void testRetriableHandleResponse() { + public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); - assertUnmapped(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } + @Test + public void testRetriableHandleResponse() { + assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); + } + @Test public void testFailedHandleResponse() { assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); @@ -102,6 +107,14 @@ private AdminApiHandler.ApiResult> h return handler.handleResponse(node, singleton(CoordinatorKey.byGroupId(groupId)), response); } + private void assertRetriable( + AdminApiHandler.ApiResult> result + ) { + assertEquals(emptySet(), result.completedKeys.keySet()); + assertEquals(emptySet(), result.failedKeys.keySet()); + assertEquals(emptyList(), result.unmappedKeys); + } + private void assertUnmapped( AdminApiHandler.ApiResult> result ) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java index 439b37733d98c..cba239920d9a9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java @@ -73,12 +73,12 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java index 30a65f4c8fcc6..4a90a89750fa2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java @@ -57,12 +57,12 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java index fe26043613773..65d91c92b0617 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java @@ -105,12 +105,12 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR, "")); + assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, "")); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS, "")); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, "")); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java index 5c98940645199..654ad0a43af84 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java @@ -70,12 +70,12 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java index 0ffa43b4c0a80..854bbe7ab3c4e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java @@ -69,12 +69,12 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test From fe764811e8503cf47cc8b940f1167507cecd0379 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 5 Jul 2021 11:48:19 +0800 Subject: [PATCH 2/5] KAFKA-13033: fix tests --- .../AlterConsumerGroupOffsetsHandler.java | 8 +- .../clients/admin/KafkaAdminClientTest.java | 136 +++++++++++------- .../AlterConsumerGroupOffsetsHandlerTest.java | 1 - ...DeleteConsumerGroupOffsetsHandlerTest.java | 2 +- .../DeleteConsumerGroupsHandlerTest.java | 2 +- .../DescribeConsumerGroupsHandlerTest.java | 2 +- .../ListConsumerGroupOffsetsHandlerTest.java | 2 +- 7 files changed, 93 insertions(+), 60 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java index 4c4c743915d97..270f81fba795e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java @@ -111,6 +111,7 @@ public ApiResult> handleResponse( List unmapped = new ArrayList<>(); Map partitions = new HashMap<>(); + int totalPartitionCount = 0; for (OffsetCommitResponseTopic topic : response.data().topics()) { for (OffsetCommitResponsePartition partition : topic.partitions()) { TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex()); @@ -120,9 +121,14 @@ public ApiResult> handleResponse( } else { partitions.put(tp, error); } + totalPartitionCount++; } } - if (failed.isEmpty() && unmapped.isEmpty()) + // only complete this request when: + // 1. no fail + // 2. no unmapped + // 3. all partitions are handled (i.e. no need to retry) + if (failed.isEmpty() && unmapped.isEmpty() && partitions.size() == totalPartitionCount) completed.put(groupId, partitions); return new ApiResult<>(completed, failed, unmapped); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 446e22d17ff93..8b5df938893f0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2689,7 +2689,6 @@ public void testDescribeConsumerGroups() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); //Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -2707,21 +2706,12 @@ public void testDescribeConsumerGroups() throws Exception { Collections.emptySet())); env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); - data = new DescribeGroupsResponseData(); - data.groups().add(DescribeGroupsResponse.groupMetadata( - GROUP_ID, - Errors.COORDINATOR_NOT_AVAILABLE, - "", - "", - "", - Collections.emptyList(), - Collections.emptySet())); - env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); - /* * We need to return two responses here, one with NOT_COORDINATOR error when calling describe consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for COORDINATOR_NOT_AVAILABLE error response */ data = new DescribeGroupsResponseData(); data.groups().add(DescribeGroupsResponse.groupMetadata( @@ -2735,6 +2725,18 @@ public void testDescribeConsumerGroups() throws Exception { env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + data = new DescribeGroupsResponseData(); + data.groups().add(DescribeGroupsResponse.groupMetadata( + GROUP_ID, + Errors.COORDINATOR_NOT_AVAILABLE, + "", + "", + "", + Collections.emptyList(), + Collections.emptySet())); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + data = new DescribeGroupsResponseData(); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); @@ -2960,9 +2962,6 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); - env.kafkaClient().prepareResponse( new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); @@ -2970,6 +2969,8 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { * We need to return two responses here, one for NOT_COORDINATOR call when calling list consumer offsets * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); @@ -2977,6 +2978,12 @@ public void testListConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( new OffsetFetchResponse(Errors.NONE, Collections.emptyMap())); @@ -3015,22 +3022,26 @@ public void testListConsumerGroupOffsets() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); // Retriable errors should be retried - env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap())); /* * We need to return two responses here, one for NOT_COORDINATOR error when calling list consumer group offsets * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); @@ -3158,7 +3169,6 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { // dummy response for MockCLient to handle the UnsupportedVersionException correctly env.kafkaClient().prepareResponse(null); //Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); @@ -3183,43 +3193,49 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode())); - final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); + DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class); // dummy response for MockCLient to handle the UnsupportedVersionException correctly env.kafkaClient().prepareResponse(null); - //Retriable errors should be retried + // Retriable errors should be retried env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - final DeletableGroupResultCollection errorResponse1 = new DeletableGroupResultCollection(); - errorResponse1.add(new DeletableGroupResult() - .setGroupId("groupId") - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) - ); - env.kafkaClient().prepareResponse(new DeleteGroupsResponse( - new DeleteGroupsResponseData() - .setResults(errorResponse1))); - - final DeletableGroupResultCollection errorResponse2 = new DeletableGroupResultCollection(); - errorResponse2.add(new DeletableGroupResult() - .setGroupId("groupId") - .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + final DeletableGroupResultCollection errorResponse = new DeletableGroupResultCollection(); + errorResponse.add(new DeletableGroupResult() + .setGroupId("groupId") + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) ); env.kafkaClient().prepareResponse(new DeleteGroupsResponse( new DeleteGroupsResponseData() - .setResults(errorResponse2))); + .setResults(errorResponse))); /* * We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ - final DeletableGroupResultCollection coordinatorMoved = new DeletableGroupResultCollection(); + + DeletableGroupResultCollection coordinatorMoved = new DeletableGroupResultCollection(); coordinatorMoved.add(new DeletableGroupResult() - .setGroupId("groupId") - .setErrorCode(Errors.NOT_COORDINATOR.code()) + .setGroupId("groupId") + .setErrorCode(Errors.NOT_COORDINATOR.code()) + ); + + env.kafkaClient().prepareResponse(new DeleteGroupsResponse( + new DeleteGroupsResponseData() + .setResults(coordinatorMoved))); + env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + coordinatorMoved = new DeletableGroupResultCollection(); + coordinatorMoved.add(new DeletableGroupResult() + .setGroupId("groupId") + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) ); + env.kafkaClient().prepareResponse(new DeleteGroupsResponse( new DeleteGroupsResponseData() .setResults(coordinatorMoved))); @@ -3229,9 +3245,9 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { new DeleteGroupsResponseData() .setResults(validResponse))); - final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds); + errorResult = env.adminClient().deleteConsumerGroups(groupIds); - final KafkaFuture errorResults = errorResult1.deletedGroups().get("groupId"); + final KafkaFuture errorResults = errorResult.deletedGroups().get("groupId"); assertNull(errorResults.get()); } } @@ -3359,9 +3375,6 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); - env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS)); @@ -3369,6 +3382,8 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { * We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse(Errors.NOT_COORDINATOR)); @@ -3376,6 +3391,12 @@ public void testDeleteConsumerGroupOffsetsRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + prepareOffsetDeleteResponse(Errors.COORDINATOR_NOT_AVAILABLE)); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( prepareOffsetDeleteResponse("foo", 0, Errors.NONE)); @@ -3599,10 +3620,6 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); - env.kafkaClient().prepareResponse( new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()))); @@ -3611,6 +3628,8 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { * We need to return two responses here, one for NOT_COORDINATOR call when calling remove member * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( new LeaveGroupResponse(new LeaveGroupResponseData() @@ -3619,6 +3638,13 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + MemberResponse memberResponse = new MemberResponse() .setGroupInstanceId("instance-1") .setErrorCode(Errors.NONE.code()); @@ -3677,13 +3703,10 @@ public void testRemoveMembersFromGroup() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); // Retriable errors should be retried - env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()))); @@ -4059,19 +4082,24 @@ public void testAlterConsumerGroupOffsetsRetriableErrors() throws Exception { prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( - prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_NOT_AVAILABLE)); + prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS)); - env.kafkaClient().prepareResponse( - prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + /* + * We need to return two responses here, one for NOT_COORDINATOR call when calling remove member + * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a + * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response + */ env.kafkaClient().prepareResponse( - prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS)); + prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR)); env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse( - prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR)); + prepareOffsetCommitResponse(tp1, Errors.COORDINATOR_NOT_AVAILABLE)); env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java index b211379497611..4a1e029c2ffa3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandlerTest.java @@ -33,7 +33,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.UnknownServerException; -import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java index cba239920d9a9..669d0fda3bc30 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java @@ -73,7 +73,7 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java index 4a90a89750fa2..85aeaac5e551c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandlerTest.java @@ -57,7 +57,7 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java index 65d91c92b0617..8cf7ef86d771d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java @@ -105,7 +105,7 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR, "")); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, "")); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE, "")); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java index 654ad0a43af84..cbe78217cb43f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java @@ -70,7 +70,7 @@ public void testSuccessfulHandleResponse() { @Test public void testUnmappedHandleResponse() { assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test From 412f12bf58adaf953e212fe2b3c25a5c8eed82e9 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 7 Jul 2021 16:31:10 +0800 Subject: [PATCH 3/5] KAFKA-13033: use apiName to refactor the message --- .../AlterConsumerGroupOffsetsHandler.java | 20 +++++++++--------- .../DeleteConsumerGroupOffsetsHandler.java | 16 ++++++++------ .../DeleteConsumerGroupsHandler.java | 19 +++++++++-------- .../DescribeConsumerGroupsHandler.java | 20 +++++++++--------- .../DescribeTransactionsHandler.java | 14 ++++++------- .../ListConsumerGroupOffsetsHandler.java | 20 +++++++++--------- ...RemoveMembersFromConsumerGroupHandler.java | 21 +++++++++---------- 7 files changed, 67 insertions(+), 63 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java index 270f81fba795e..c495f63cb9667 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java @@ -142,28 +142,28 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `OffsetCommit` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("OffsetCommit request for group {} failed because the coordinator" + - " is still in the process of loading state. Will retry. Will retry", groupId); + log.debug("`{}` request for group {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", apiName(), groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("OffsetCommit request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry.", groupId, error); + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", apiName(), groupId, error); unmapped.add(groupId); break; default: - log.error("Received unexpected error for group {} in `OffsetCommit` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `OffsetCommit` response")); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, groupId, apiName(), error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index a93a0486006f0..f788dc75b53a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -131,24 +131,28 @@ private boolean handleError( case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: - log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId, - error.exception()); + log.error("Received non retriable error for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); return true; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("DeleteConsumerGroupOffsets request for group {} failed because the coordinator" + - " is still in the process of loading state. Will retry. Will retry", groupId); + log.debug("`{}` request for group {} failed because the coordinator" + + " is still in the process of loading state. Will retry.", apiName(), groupId); return true; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("DeleteConsumerGroupOffsets request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry.", groupId, error); + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", apiName(), groupId, error); unmapped.add(groupId); return true; default: + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); return false; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java index 0e1bbff6ad04c..5cd30d49ff57c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java @@ -112,27 +112,28 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("DeleteConsumerGroups request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", groupId); + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("DeleteConsumerGroups request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", groupId, error); + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); unmapped.add(groupId); break; default: - log.error("Received unexpected error for group {} in `DeleteConsumerGroups` response", - groupId, error.exception()); - failed.put(groupId, error.exception()); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index 72439d5ff1c10..978576c0c95cc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -166,28 +166,28 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `DescribeGroups` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("DeleteConsumerGroups request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", groupId); + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("DescribeGroups request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", groupId, error); + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); unmapped.add(groupId); break; default: - log.error("Received unexpected error for group {} in `DescribeGroups` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `DescribeGroups` response")); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java index d270145a423da..640265bd028bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java @@ -159,21 +159,21 @@ private void handleError( switch (error) { case TRANSACTIONAL_ID_AUTHORIZATION_FAILED: failed.put(transactionalIdKey, new TransactionalIdAuthorizationException( - "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " + + apiName() + " request for transactionalId `" + transactionalIdKey.idValue + "` " + "failed due to authorization failure")); break; case TRANSACTIONAL_ID_NOT_FOUND: failed.put(transactionalIdKey, new TransactionalIdNotFoundException( - "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " + + apiName() + " request for transactionalId `" + transactionalIdKey.idValue + "` " + "failed because the ID could not be found")); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("DescribeTransactions request for transactionalId `{}` failed because the " + + log.debug("{} request for transactionalId `{}` failed because the " + "coordinator is still in the process of loading state. Will retry", - transactionalIdKey.idValue); + apiName(), transactionalIdKey.idValue); break; case NOT_COORDINATOR: @@ -181,12 +181,12 @@ private void handleError( // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request unmapped.add(transactionalIdKey); - log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will attempt " + - "to find the coordinator again and retry", transactionalIdKey.idValue, error); + log.debug("{} request for transactionalId `{}` returned error {}. Will attempt " + + "to find the coordinator again and retry", apiName(), transactionalIdKey.idValue, error); break; default: - failed.put(transactionalIdKey, error.exception("DescribeTransactions request for " + + failed.put(transactionalIdKey, error.exception(apiName() + " request for " + "transactionalId `" + transactionalIdKey.idValue + "` failed due to unexpected error")); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java index a8c0ee29e4244..9cf3d6e572110 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java @@ -125,30 +125,30 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("OffsetFetch request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", groupId); + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("OffsetFetch request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", groupId, error); + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); unmapped.add(groupId); break; default: - log.error("Received unexpected error for group {} in `OffsetFetch` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `OffsetFetch` response")); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java index edd7798b91c57..446eb2710cf69 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java @@ -109,31 +109,30 @@ private void handleError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `LeaveGroup` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("LeaveGroup request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", groupId); + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("LeaveGroup request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", groupId, error); + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); unmapped.add(groupId); break; default: - log.error("Received unexpected error for group {} in `LeaveGroup` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `LeaveGroup` response")); - break; + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } From ea0ce2950669e84274d851aca4059d02f5e11d05 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 7 Jul 2021 16:44:38 +0800 Subject: [PATCH 4/5] KAFKA-13033: fix broken test --- .../AlterConsumerGroupOffsetsHandler.java | 2 +- .../DeleteConsumerGroupOffsetsHandler.java | 4 ---- .../internals/DeleteConsumerGroupsHandler.java | 2 +- .../internals/ListTransactionsHandler.java | 18 +++++++++--------- .../clients/admin/KafkaAdminClientTest.java | 2 -- 5 files changed, 11 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java index c495f63cb9667..2ba86cfffea09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java @@ -162,7 +162,7 @@ private void handleError( default: final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", groupId, apiName()); - log.error(unexpectedErrorMsg, groupId, apiName(), error.exception()); + log.error(unexpectedErrorMsg, error.exception()); failed.put(groupId, error.exception(unexpectedErrorMsg)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index f788dc75b53a8..7eb53805dd4d0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -149,10 +149,6 @@ private boolean handleError( unmapped.add(groupId); return true; default: - final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", - groupId, apiName()); - log.error(unexpectedErrorMsg, error.exception()); - failed.put(groupId, error.exception(unexpectedErrorMsg)); return false; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java index 5cd30d49ff57c..01565d964fa56 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java @@ -133,7 +133,7 @@ private void handleError( final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", groupId, apiName()); log.error(unexpectedErrorMsg, error.exception()); - failed.put(groupId, error.exception(unexpectedErrorMsg)); + failed.put(groupId, error.exception()); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java index d60580c85bf22..2eb8f1247d6e7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java @@ -89,18 +89,18 @@ public ApiResult> h Errors error = Errors.forCode(response.data().errorCode()); if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { - log.debug("The `ListTransactions` request sent to broker {} failed because the " + - "coordinator is still loading state. Will try again after backing off", brokerId); + log.debug("The `{}` request sent to broker {} failed because the " + + "coordinator is still loading state. Will try again after backing off", apiName(), brokerId); return ApiResult.empty(); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) { - log.debug("The `ListTransactions` request sent to broker {} failed because the " + - "coordinator is shutting down", brokerId); - return ApiResult.failed(key, new CoordinatorNotAvailableException("ListTransactions " + - "request sent to broker " + brokerId + " failed because the coordinator is shutting down")); + log.debug("The `{}` request sent to broker {} failed because the " + + "coordinator is shutting down", apiName(), brokerId); + return ApiResult.failed(key, new CoordinatorNotAvailableException(apiName() + + " request sent to broker " + brokerId + " failed because the coordinator is shutting down")); } else if (error != Errors.NONE) { - log.error("The `ListTransactions` request sent to broker {} failed because of an " + - "unexpected error {}", brokerId, error); - return ApiResult.failed(key, error.exception("ListTransactions request " + + log.error("The `{}` request sent to broker {} failed because of an " + + "unexpected error {}", apiName(), brokerId, error); + return ApiResult.failed(key, error.exception(apiName() + " request " + "sent to broker " + brokerId + " failed with an unexpected exception")); } else { List listings = response.data().transactionStates().stream() diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 61901d9e8cfa8..f29ab4f872fe5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3192,8 +3192,6 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class); - // dummy response for MockCLient to handle the UnsupportedVersionException correctly - env.kafkaClient().prepareResponse(null); // Retriable errors should be retried env.kafkaClient().prepareResponse( prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); From f9db3e4a677994862a952920d035cadd52b4c637 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Sun, 11 Jul 2021 20:21:36 +0800 Subject: [PATCH 5/5] KAFKA-13033: handle all possible errors in handler --- .../DeleteConsumerGroupOffsetsHandler.java | 29 ++++--- .../DeleteConsumerGroupsHandler.java | 7 ++ ...DeleteConsumerGroupOffsetsHandlerTest.java | 83 ++++++++++++++++--- 3 files changed, 95 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java index 7eb53805dd4d0..c4fe653334810 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandler.java @@ -104,15 +104,18 @@ public ApiResult> handleResponse( final Errors error = Errors.forCode(response.data().errorCode()); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupLevelError(groupId, error, failed, unmapped); } else { final Map partitions = new HashMap<>(); response.data().topics().forEach(topic -> - topic.partitions().forEach(partition -> { - Errors partitionError = Errors.forCode(partition.errorCode()); - if (!handleError(groupId, partitionError, failed, unmapped)) { - partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError); + topic.partitions().forEach(partitionOffsetDeleteResponse -> { + Errors partitionError = Errors.forCode(partitionOffsetDeleteResponse.errorCode()); + TopicPartition tp = new TopicPartition(topic.name(), partitionOffsetDeleteResponse.partitionIndex()); + if (log.isDebugEnabled() && partitionError != Errors.NONE) { + log.debug("`{}` request for group {} returned error {} in the partition {}.", + apiName(), groupId, partitionError, tp); } + partitions.put(tp, partitionError); }) ); if (!partitions.isEmpty()) @@ -121,7 +124,7 @@ public ApiResult> handleResponse( return new ApiResult<>(completed, failed, unmapped); } - private boolean handleError( + private void handleGroupLevelError( CoordinatorKey groupId, Errors error, Map failed, @@ -131,15 +134,16 @@ private boolean handleError( case GROUP_AUTHORIZATION_FAILED: case GROUP_ID_NOT_FOUND: case INVALID_GROUP_ID: + case NON_EMPTY_GROUP: log.error("Received non retriable error for group {} in `{}` response", groupId, - apiName(), error.exception()); + apiName(), error.exception()); failed.put(groupId, error.exception()); - return true; + break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry log.debug("`{}` request for group {} failed because the coordinator" + " is still in the process of loading state. Will retry.", apiName(), groupId); - return true; + break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap @@ -147,9 +151,12 @@ private boolean handleError( log.debug("`{}` request for group {} returned error {}. " + "Will attempt to find the coordinator again and retry.", apiName(), groupId, error); unmapped.add(groupId); - return true; + break; default: - return false; + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception()); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java index 01565d964fa56..09b7a7187fc49 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupsHandler.java @@ -116,6 +116,13 @@ private void handleError( apiName(), error.exception()); failed.put(groupId, error.exception()); break; + case INVALID_GROUP_ID: + case NON_EMPTY_GROUP: + case GROUP_ID_NOT_FOUND: + log.error("Received non retriable failure for group {} in `{}` response", groupId, + apiName(), error.exception()); + failed.put(groupId, error.exception()); + break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry log.debug("`{}` request for group {} failed because the coordinator " + diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java index 669d0fda3bc30..745d80e558d3b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DeleteConsumerGroupOffsetsHandlerTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -67,28 +68,58 @@ public void testBuildRequest() { @Test public void testSuccessfulHandleResponse() { Map responseData = Collections.singletonMap(t0p0, Errors.NONE); - assertCompleted(handleWithError(Errors.NONE), responseData); + assertCompleted(handleWithGroupError(Errors.NONE), responseData); } @Test public void testUnmappedHandleResponse() { - assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); - assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); } @Test public void testRetriableHandleResponse() { - assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); + assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test - public void testFailedHandleResponse() { - assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); - assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND)); - assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID)); + public void testFailedHandleResponseWithGroupError() { + assertGroupFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED)); + assertGroupFailed(GroupIdNotFoundException.class, handleWithGroupError(Errors.GROUP_ID_NOT_FOUND)); + assertGroupFailed(InvalidGroupIdException.class, handleWithGroupError(Errors.INVALID_GROUP_ID)); } - private OffsetDeleteResponse buildResponse(Errors error) { + @Test + public void testFailedHandleResponseWithPartitionError() { + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.GROUP_SUBSCRIBED_TO_TOPIC), + handleWithPartitionError(Errors.GROUP_SUBSCRIBED_TO_TOPIC)); + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.TOPIC_AUTHORIZATION_FAILED), + handleWithPartitionError(Errors.TOPIC_AUTHORIZATION_FAILED)); + assertPartitionFailed(Collections.singletonMap(t0p0, Errors.UNKNOWN_TOPIC_OR_PARTITION), + handleWithPartitionError(Errors.UNKNOWN_TOPIC_OR_PARTITION)); + } + + private OffsetDeleteResponse buildGroupErrorResponse(Errors error) { + OffsetDeleteResponse response = new OffsetDeleteResponse( + new OffsetDeleteResponseData() + .setErrorCode(error.code())); + if (error == Errors.NONE) { + response.data() + .setThrottleTimeMs(0) + .setTopics(new OffsetDeleteResponseTopicCollection(singletonList( + new OffsetDeleteResponseTopic() + .setName("t0") + .setPartitions(new OffsetDeleteResponsePartitionCollection(singletonList( + new OffsetDeleteResponsePartition() + .setPartitionIndex(0) + .setErrorCode(error.code()) + ).iterator())) + ).iterator())); + } + return response; + } + + private OffsetDeleteResponse buildPartitionErrorResponse(Errors error) { OffsetDeleteResponse response = new OffsetDeleteResponse( new OffsetDeleteResponseData() .setThrottleTimeMs(0) @@ -100,15 +131,24 @@ private OffsetDeleteResponse buildResponse(Errors error) { .setPartitionIndex(0) .setErrorCode(error.code()) ).iterator())) - ).iterator()))); + ).iterator())) + ); return response; } - private AdminApiHandler.ApiResult> handleWithError( + private AdminApiHandler.ApiResult> handleWithGroupError( Errors error ) { DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext); - OffsetDeleteResponse response = buildResponse(error); + OffsetDeleteResponse response = buildGroupErrorResponse(error); + return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + } + + private AdminApiHandler.ApiResult> handleWithPartitionError( + Errors error + ) { + DeleteConsumerGroupOffsetsHandler handler = new DeleteConsumerGroupOffsetsHandler(groupId, tps, logContext); + OffsetDeleteResponse response = buildPartitionErrorResponse(error); return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); } @@ -139,7 +179,7 @@ private void assertCompleted( assertEquals(expected, result.completedKeys.get(key)); } - private void assertFailed( + private void assertGroupFailed( Class expectedExceptionType, AdminApiHandler.ApiResult> result ) { @@ -149,4 +189,21 @@ private void assertFailed( assertEquals(singleton(key), result.failedKeys.keySet()); assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key))); } + + private void assertPartitionFailed( + Map expectedResult, + AdminApiHandler.ApiResult> result + ) { + CoordinatorKey key = CoordinatorKey.byGroupId(groupId); + assertEquals(singleton(key), result.completedKeys.keySet()); + + // verify the completed value is expected result + Collection> completeCollection = result.completedKeys.values(); + assertEquals(1, completeCollection.size()); + Map completeMap = completeCollection.iterator().next(); + assertEquals(expectedResult, completeMap); + + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(emptySet(), result.failedKeys.keySet()); + } }