From b4a3bfab865b966b2162b57264b06c06be0ca275 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Tue, 13 Jul 2021 17:37:52 +0800 Subject: [PATCH 1/4] KAFKA-13072: refactor RemoveMembersFromConsumerGroupHandler and tests --- ...RemoveMembersFromConsumerGroupHandler.java | 105 ++++++++++++++---- .../clients/admin/KafkaAdminClientTest.java | 82 ++++++++++++-- ...veMembersFromConsumerGroupHandlerTest.java | 69 +++++++++--- 3 files changed, 214 insertions(+), 42 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java index c6af2d4a3db7b..cc1cc024ed0c9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -82,52 +83,114 @@ public ApiResult> handleResponse( final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; Map> completed = new HashMap<>(); Map failed = new HashMap<>(); - List unmapped = new ArrayList<>(); + final Set groupsToUnmap = new HashSet<>(); + final Set groupsToRetry = new HashSet<>(); - final Errors error = Errors.forCode(response.data().errorCode()); + final Errors error = response.topLevelError(); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { final Map memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { + Errors memberError = Errors.forCode(memberResponse.errorCode()); + String memberId = memberResponse.memberId(); + + if (memberError != Errors.NONE) { + handleMemberError(groupId, memberId, memberError, groupsToUnmap, groupsToRetry); + } + memberErrors.put(new MemberIdentity() - .setMemberId(memberResponse.memberId()) + .setMemberId(memberId) .setGroupInstanceId(memberResponse.groupInstanceId()), - Errors.forCode(memberResponse.errorCode())); + memberError); } completed.put(groupId, memberErrors); } - return new ApiResult<>(completed, failed, unmapped); + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); + } } - private void handleError( + private void handleGroupError( CoordinatorKey groupId, - Errors error, Map failed, - List unmapped + Errors error, + Map failed, + Set groupsToUnmap, + Set groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `LeaveGroup` response", groupId, - error.exception()); + log.error("Received authorization failure for group {} in `{}` response", groupId, + apiName(), error.exception()); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), groupId); + groupsToRetry.add(groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("LeaveGroup request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), groupId, error); + groupsToUnmap.add(groupId); break; + default: - log.error("Received unexpected error for group {} in `LeaveGroup` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `LeaveGroup` response")); + final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", + groupId, apiName()); + log.error(unexpectedErrorMsg, error.exception()); + failed.put(groupId, error.exception(unexpectedErrorMsg)); + } + } + + private void handleMemberError( + CoordinatorKey groupId, + String memberId, + Errors error, + Set groupsToUnmap, + Set groupsToRetry + ) { + switch (error) { + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`{}` request for the member {} in group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", apiName(), memberId, groupId); + groupsToRetry.add(groupId); break; + case COORDINATOR_NOT_AVAILABLE: + case NOT_COORDINATOR: + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`{}` request for the member {} in group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", apiName(), memberId, groupId, error); + groupsToUnmap.add(groupId); + break; + case FENCED_INSTANCE_ID: + case UNKNOWN_MEMBER_ID: + log.debug("`{}` request for the member {} in group {} returned error {}.", apiName(), memberId, groupId, error); + break; + default: + log.debug("`{}` request for the member {} in group {} returned unexpected error {}.", + apiName(), memberId, groupId, error); } } -} +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 53e326ad8955f..088869346332f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3641,10 +3641,6 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - env.kafkaClient().prepareResponse( - new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); - env.kafkaClient().prepareResponse( new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()))); @@ -3653,6 +3649,8 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { * We need to return two responses here, one for NOT_COORDINATOR call when calling remove member * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response */ env.kafkaClient().prepareResponse( new LeaveGroupResponse(new LeaveGroupResponseData() @@ -3661,6 +3659,13 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { env.kafkaClient().prepareResponse( prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + MemberResponse memberResponse = new MemberResponse() .setGroupInstanceId("instance-1") .setErrorCode(Errors.NONE.code()); @@ -3680,6 +3685,72 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { } } + @Test + public void testRemoveMembersFromGroupRetriableErrorsInMemberResponse() throws Exception { + // Retriable errors should be retried + String groupId = "instance-1"; + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + MemberResponse memberResponse = new MemberResponse() + .setGroupInstanceId(groupId) + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + + env.kafkaClient().prepareResponse( + new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setMembers(Collections.singletonList(memberResponse)))); + + /* + * We need to return two responses here, one for NOT_COORDINATOR call when calling remove member + * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a + * FindCoordinatorResponse. + * + * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response + */ + + memberResponse.setErrorCode(Errors.NOT_COORDINATOR.code()); + + env.kafkaClient().prepareResponse( + new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setMembers(Collections.singletonList(memberResponse)))); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + memberResponse.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()); + + env.kafkaClient().prepareResponse( + new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setMembers(Collections.singletonList(memberResponse)))); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + memberResponse.setErrorCode(Errors.NONE.code()); + + env.kafkaClient().prepareResponse( + new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setMembers(Collections.singletonList(memberResponse)))); + + MemberToRemove memberToRemove = new MemberToRemove("instance-1"); + Collection membersToRemove = singletonList(memberToRemove); + + final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( + GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); + + assertNull(result.all().get()); + assertNull(result.memberResult(memberToRemove).get()); + } + } + @Test public void testRemoveMembersFromGroupNonRetriableErrors() throws Exception { // Non-retriable errors throw an exception @@ -3719,13 +3790,10 @@ public void testRemoveMembersFromGroup() throws Exception { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Retriable FindCoordinatorResponse errors should be retried - env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); // Retriable errors should be retried - env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))); env.kafkaClient().prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData() .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()))); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java index 0ffa43b4c0a80..076171bb61008 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java @@ -63,39 +63,60 @@ public void testBuildRequest() { @Test public void testSuccessfulHandleResponse() { Map responseData = Collections.singletonMap(m1, Errors.NONE); - assertCompleted(handleWithError(Errors.NONE), responseData); + assertCompleted(handleWithGroupError(Errors.NONE), responseData); } @Test public void testUnmappedHandleResponse() { - assertUnmapped(handleWithError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); + assertUnmapped(handleWithMemberError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertUnmapped(handleWithMemberError(Errors.NOT_COORDINATOR)); } @Test public void testRetriableHandleResponse() { - assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE)); + assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); + assertRetriable(handleWithMemberError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test public void testFailedHandleResponse() { - assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED)); - assertFailed(UnknownServerException.class, handleWithError(Errors.UNKNOWN_SERVER_ERROR)); + assertFailed(GroupAuthorizationException.class, handleWithGroupError(Errors.GROUP_AUTHORIZATION_FAILED)); + assertFailed(UnknownServerException.class, handleWithGroupError(Errors.UNKNOWN_SERVER_ERROR)); + } + + @Test + public void testFailedHandleResponseInMemberLevel() { + assertMemberFailed(Errors.FENCED_INSTANCE_ID, handleWithMemberError(Errors.FENCED_INSTANCE_ID)); + assertMemberFailed(Errors.UNKNOWN_MEMBER_ID, handleWithMemberError(Errors.UNKNOWN_MEMBER_ID)); } private LeaveGroupResponse buildResponse(Errors error) { LeaveGroupResponse response = new LeaveGroupResponse( - new LeaveGroupResponseData() - .setErrorCode(error.code()) - .setMembers(singletonList( - new MemberResponse() - .setErrorCode(error.code()) - .setMemberId("m1") - .setGroupInstanceId("m1-gii")))); + new LeaveGroupResponseData() + .setErrorCode(error.code()) + .setMembers(singletonList( + new MemberResponse() + .setErrorCode(Errors.NONE.code()) + .setMemberId("m1") + .setGroupInstanceId("m1-gii")))); return response; } - private AdminApiHandler.ApiResult> handleWithError( + private LeaveGroupResponse buildResponseWithMemberError(Errors error) { + LeaveGroupResponse response = new LeaveGroupResponse( + new LeaveGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setMembers(singletonList( + new MemberResponse() + .setErrorCode(error.code()) + .setMemberId("m1") + .setGroupInstanceId("m1-gii")))); + return response; + } + + private AdminApiHandler.ApiResult> handleWithGroupError( Errors error ) { RemoveMembersFromConsumerGroupHandler handler = new RemoveMembersFromConsumerGroupHandler(groupId, members, logContext); @@ -103,6 +124,14 @@ private AdminApiHandler.ApiResult> h return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); } + private AdminApiHandler.ApiResult> handleWithMemberError( + Errors error + ) { + RemoveMembersFromConsumerGroupHandler handler = new RemoveMembersFromConsumerGroupHandler(groupId, members, logContext); + LeaveGroupResponse response = buildResponseWithMemberError(error); + return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId)), response); + } + private void assertUnmapped( AdminApiHandler.ApiResult> result ) { @@ -140,4 +169,16 @@ private void assertFailed( assertEquals(singleton(key), result.failedKeys.keySet()); assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key))); } + + private void assertMemberFailed( + Errors expectedError, + AdminApiHandler.ApiResult> result + ) { + Map expectedResponseData = Collections.singletonMap(m1, expectedError); + CoordinatorKey key = CoordinatorKey.byGroupId(groupId); + assertEquals(emptySet(), result.failedKeys.keySet()); + assertEquals(emptyList(), result.unmappedKeys); + assertEquals(singleton(key), result.completedKeys.keySet()); + assertEquals(expectedResponseData, result.completedKeys.get(key)); + } } From b7acfcdbdcb3e93dc61f05efecb4d5eeb96baa28 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 15:13:22 +0800 Subject: [PATCH 2/4] KAFKA-13072: refactor code --- ...RemoveMembersFromConsumerGroupHandler.java | 71 ++++++------------- .../clients/admin/KafkaAdminClientTest.java | 2 +- ...veMembersFromConsumerGroupHandlerTest.java | 3 - 3 files changed, 24 insertions(+), 52 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java index cc1cc024ed0c9..57fe0cb3cd244 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java @@ -69,8 +69,18 @@ public static AdminApiFuture.SimpleAdminApiFuture groupIds + ) { + if (!groupIds.equals(Collections.singleton(groupId))) { + throw new IllegalArgumentException("Received unexpected group ids " + groupIds + + " (expected only " + Collections.singleton(groupId) + ")"); + } + } + @Override - public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set keys) { + public LeaveGroupRequest.Builder buildRequest(int coordinatorId, Set groupIds) { + validateKeys(groupIds); return new LeaveGroupRequest.Builder(groupId.idValue, members); } @@ -80,9 +90,11 @@ public ApiResult> handleResponse( Set groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; - Map> completed = new HashMap<>(); - Map failed = new HashMap<>(); + final Map> completed = new HashMap<>(); + final Map failed = new HashMap<>(); final Set groupsToUnmap = new HashSet<>(); final Set groupsToRetry = new HashSet<>(); @@ -95,10 +107,6 @@ public ApiResult> handleResponse( Errors memberError = Errors.forCode(memberResponse.errorCode()); String memberId = memberResponse.memberId(); - if (memberError != Errors.NONE) { - handleMemberError(groupId, memberId, memberError, groupsToUnmap, groupsToRetry); - } - memberErrors.put(new MemberIdentity() .setMemberId(memberId) .setGroupInstanceId(memberResponse.groupInstanceId()), @@ -133,64 +141,31 @@ private void handleGroupError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `{}` response", groupId, - apiName(), error.exception()); + log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`{}` request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", apiName(), groupId); + log.debug("`LeaveGroup` request for group {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId); groupsToRetry.add(groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("`{}` request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", apiName(), groupId, error); + log.debug("`LeaveGroup` request for group {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId, error); groupsToUnmap.add(groupId); break; default: - final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response", - groupId, apiName()); - log.error(unexpectedErrorMsg, error.exception()); + final String unexpectedErrorMsg = + String.format("`LeaveGroup` request for group id %s failed due to unexpected error %s", groupId, error); + log.error(unexpectedErrorMsg); failed.put(groupId, error.exception(unexpectedErrorMsg)); } } - private void handleMemberError( - CoordinatorKey groupId, - String memberId, - Errors error, - Set groupsToUnmap, - Set groupsToRetry - ) { - switch (error) { - case COORDINATOR_LOAD_IN_PROGRESS: - // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`{}` request for the member {} in group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", apiName(), memberId, groupId); - groupsToRetry.add(groupId); - break; - case COORDINATOR_NOT_AVAILABLE: - case NOT_COORDINATOR: - // If the coordinator is unavailable or there was a coordinator change, then we unmap - // the key so that we retry the `FindCoordinator` request - log.debug("`{}` request for the member {} in group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", apiName(), memberId, groupId, error); - groupsToUnmap.add(groupId); - break; - case FENCED_INSTANCE_ID: - case UNKNOWN_MEMBER_ID: - log.debug("`{}` request for the member {} in group {} returned error {}.", apiName(), memberId, groupId, error); - break; - default: - log.debug("`{}` request for the member {} in group {} returned unexpected error {}.", - apiName(), memberId, groupId, error); - } - } - } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 088869346332f..64eb9b453ec37 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3574,7 +3574,7 @@ public void testRemoveMembersFromGroupNumRetries() throws Exception { Collection membersToRemove = Arrays.asList(new MemberToRemove("instance-1"), new MemberToRemove("instance-2")); final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( - "groupId", new RemoveMembersFromConsumerGroupOptions(membersToRemove)); + GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); TestUtils.assertFutureError(result.all(), TimeoutException.class); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java index 076171bb61008..6f5dfda5bc307 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandlerTest.java @@ -70,14 +70,11 @@ public void testSuccessfulHandleResponse() { public void testUnmappedHandleResponse() { assertUnmapped(handleWithGroupError(Errors.COORDINATOR_NOT_AVAILABLE)); assertUnmapped(handleWithGroupError(Errors.NOT_COORDINATOR)); - assertUnmapped(handleWithMemberError(Errors.COORDINATOR_NOT_AVAILABLE)); - assertUnmapped(handleWithMemberError(Errors.NOT_COORDINATOR)); } @Test public void testRetriableHandleResponse() { assertRetriable(handleWithGroupError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); - assertRetriable(handleWithMemberError(Errors.COORDINATOR_LOAD_IN_PROGRESS)); } @Test From a4e75903536f638ac066fbc1ef4d47054cf0bd92 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 14 Jul 2021 20:03:03 +0800 Subject: [PATCH 3/4] KAFKA-13072: refactor --- .../RemoveMembersFromConsumerGroupHandler.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java index 57fe0cb3cd244..3d9de50319523 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java @@ -141,28 +141,28 @@ private void handleGroupError( ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId, error); + log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; case COORDINATOR_LOAD_IN_PROGRESS: // If the coordinator is in the middle of loading, then we just need to retry - log.debug("`LeaveGroup` request for group {} failed because the coordinator " + - "is still in the process of loading state. Will retry", groupId); + log.debug("`LeaveGroup` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId.idValue); groupsToRetry.add(groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: // If the coordinator is unavailable or there was a coordinator change, then we unmap // the key so that we retry the `FindCoordinator` request - log.debug("`LeaveGroup` request for group {} returned error {}. " + - "Will attempt to find the coordinator again and retry", groupId, error); + log.debug("`LeaveGroup` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId.idValue, error); groupsToUnmap.add(groupId); break; default: final String unexpectedErrorMsg = - String.format("`LeaveGroup` request for group id %s failed due to unexpected error %s", groupId, error); + String.format("`LeaveGroup` request for group id %s failed due to unexpected error %s", groupId.idValue, error); log.error(unexpectedErrorMsg); failed.put(groupId, error.exception(unexpectedErrorMsg)); } From 0fb04e9dd851c650284bbb5400f6f8244b9f5981 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Thu, 15 Jul 2021 10:51:19 +0800 Subject: [PATCH 4/4] KAFKA-13072: remove unneeded test and refactor --- ...RemoveMembersFromConsumerGroupHandler.java | 46 ++++--------- .../clients/admin/KafkaAdminClientTest.java | 66 ------------------- 2 files changed, 14 insertions(+), 98 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java index 3d9de50319523..e463911c5d928 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java @@ -91,43 +91,29 @@ public ApiResult> handleResponse( AbstractResponse abstractResponse ) { validateKeys(groupIds); - final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; - final Map> completed = new HashMap<>(); - final Map failed = new HashMap<>(); - final Set groupsToUnmap = new HashSet<>(); - final Set groupsToRetry = new HashSet<>(); final Errors error = response.topLevelError(); if (error != Errors.NONE) { - handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); + final Map failed = new HashMap<>(); + final Set groupsToUnmap = new HashSet<>(); + + handleGroupError(groupId, error, failed, groupsToUnmap); + + return new ApiResult<>(Collections.emptyMap(), failed, new ArrayList<>(groupsToUnmap)); } else { final Map memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { - Errors memberError = Errors.forCode(memberResponse.errorCode()); - String memberId = memberResponse.memberId(); - memberErrors.put(new MemberIdentity() - .setMemberId(memberId) + .setMemberId(memberResponse.memberId()) .setGroupInstanceId(memberResponse.groupInstanceId()), - memberError); - + Errors.forCode(memberResponse.errorCode())); } - completed.put(groupId, memberErrors); - } - if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { - return new ApiResult<>( - completed, - failed, - Collections.emptyList() - ); - } else { - // retry the request, so don't send completed/failed results back return new ApiResult<>( + Collections.singletonMap(groupId, memberErrors), Collections.emptyMap(), - Collections.emptyMap(), - new ArrayList<>(groupsToUnmap) + Collections.emptyList() ); } } @@ -136,8 +122,7 @@ private void handleGroupError( CoordinatorKey groupId, Errors error, Map failed, - Set groupsToUnmap, - Set groupsToRetry + Set groupsToUnmap ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: @@ -149,7 +134,6 @@ private void handleGroupError( // If the coordinator is in the middle of loading, then we just need to retry log.debug("`LeaveGroup` request for group id {} failed because the coordinator " + "is still in the process of loading state. Will retry", groupId.idValue); - groupsToRetry.add(groupId); break; case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: @@ -161,11 +145,9 @@ private void handleGroupError( break; default: - final String unexpectedErrorMsg = - String.format("`LeaveGroup` request for group id %s failed due to unexpected error %s", groupId.idValue, error); - log.error(unexpectedErrorMsg); - failed.put(groupId, error.exception(unexpectedErrorMsg)); + log.error("`LeaveGroup` request for group id {} failed due to unexpected error {}", groupId.idValue, error); + failed.put(groupId, error.exception()); } } -} \ No newline at end of file +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 64eb9b453ec37..86d019f11cdfc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3685,72 +3685,6 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { } } - @Test - public void testRemoveMembersFromGroupRetriableErrorsInMemberResponse() throws Exception { - // Retriable errors should be retried - String groupId = "instance-1"; - - try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - - env.kafkaClient().prepareResponse( - prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - - MemberResponse memberResponse = new MemberResponse() - .setGroupInstanceId(groupId) - .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); - - env.kafkaClient().prepareResponse( - new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.NONE.code()) - .setMembers(Collections.singletonList(memberResponse)))); - - /* - * We need to return two responses here, one for NOT_COORDINATOR call when calling remove member - * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a - * FindCoordinatorResponse. - * - * And the same reason for the following COORDINATOR_NOT_AVAILABLE error response - */ - - memberResponse.setErrorCode(Errors.NOT_COORDINATOR.code()); - - env.kafkaClient().prepareResponse( - new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.NONE.code()) - .setMembers(Collections.singletonList(memberResponse)))); - - env.kafkaClient().prepareResponse( - prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - - memberResponse.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()); - - env.kafkaClient().prepareResponse( - new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.NONE.code()) - .setMembers(Collections.singletonList(memberResponse)))); - - env.kafkaClient().prepareResponse( - prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - - memberResponse.setErrorCode(Errors.NONE.code()); - - env.kafkaClient().prepareResponse( - new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.NONE.code()) - .setMembers(Collections.singletonList(memberResponse)))); - - MemberToRemove memberToRemove = new MemberToRemove("instance-1"); - Collection membersToRemove = singletonList(memberToRemove); - - final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( - GROUP_ID, new RemoveMembersFromConsumerGroupOptions(membersToRemove)); - - assertNull(result.all().get()); - assertNull(result.memberResult(memberToRemove).get()); - } - } - @Test public void testRemoveMembersFromGroupNonRetriableErrors() throws Exception { // Non-retriable errors throw an exception