Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -86,47 +87,54 @@ public ApiResult<CoordinatorKey, Void> handleResponse(
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse
) {
DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
Map<CoordinatorKey, Void> completed = new HashMap<>();
Map<CoordinatorKey, Throwable> failed = new HashMap<>();
List<CoordinatorKey> unmapped = new ArrayList<>();
final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse;
final Map<CoordinatorKey, Void> completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();

for (DeletableGroupResult deletedGroup : response.data().results()) {
CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(deletedGroup.groupId());
Errors error = Errors.forCode(deletedGroup.errorCode());
if (error != Errors.NONE) {
handleError(groupIdKey, error, failed, unmapped);
handleError(groupIdKey, error, failed, groupsToUnmap);
continue;
}

completed.put(groupIdKey, null);
}
return new ApiResult<>(completed, failed, unmapped);

return new ApiResult<>(completed, failed, new ArrayList<>(groupsToUnmap));
}

private void handleError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
List<CoordinatorKey> unmapped
Set<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId,
error.exception());
case INVALID_GROUP_ID:
case NON_EMPTY_GROUP:
case GROUP_ID_NOT_FOUND:
log.debug("`DeleteConsumerGroups` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
break;
case COORDINATOR_LOAD_IN_PROGRESS:
case COORDINATOR_NOT_AVAILABLE:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`DeleteConsumerGroups` request for group id {} failed because the coordinator " +
"is still in the process of loading state. Will retry", groupId.idValue);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("DeleteConsumerGroups request for group {} returned error {}. Will retry",
groupId, error);
unmapped.add(groupId);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`DeleteConsumerGroups` request for group id {} returned error {}. " +
"Will attempt to find the coordinator again and retry", groupId.idValue, error);
groupsToUnmap.add(groupId);
break;
default:
log.error("Received unexpected error for group {} in `DeleteConsumerGroups` response",
groupId, error.exception());
log.error("`DeleteConsumerGroups` request for group id {} failed due to unexpected error {}", groupId.idValue, error);
failed.put(groupId, error.exception());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3179,41 +3179,47 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
env.kafkaClient().prepareResponse(
prepareOldFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode()));

final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
TestUtils.assertFutureError(errorResult.deletedGroups().get("groupId"), GroupAuthorizationException.class);

//Retriable errors should be retried
// Retriable errors should be retried
env.kafkaClient().prepareResponse(
prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

final DeletableGroupResultCollection errorResponse1 = new DeletableGroupResultCollection();
errorResponse1.add(new DeletableGroupResult()
.setGroupId("groupId")
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
);
env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
new DeleteGroupsResponseData()
.setResults(errorResponse1)));
Comment on lines -3189 to -3196
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we moving this to later?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is testing "retriable" errors should be retried. Before the change, COORDINATOR_NOT_AVAILABLE is considered as retriable error. But after this PR, it'll considered as unmapped error, so it is moved to later, to test when receiving the error, we should re-find coordinator, and then re-send request.


final DeletableGroupResultCollection errorResponse2 = new DeletableGroupResultCollection();
errorResponse2.add(new DeletableGroupResult()
final DeletableGroupResultCollection errorResponse = new DeletableGroupResultCollection();
errorResponse.add(new DeletableGroupResult()
.setGroupId("groupId")
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
);
env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
new DeleteGroupsResponseData()
.setResults(errorResponse2)));
.setResults(errorResponse)));

/*
* We need to return two responses here, one for NOT_COORDINATOR call when calling delete a consumer group
* api using coordinator that has moved. This will retry whole operation. So we need to again respond with a
* FindCoordinatorResponse.
*
* And the same reason for the following COORDINATOR_NOT_AVAILABLE error response
*/
final DeletableGroupResultCollection coordinatorMoved = new DeletableGroupResultCollection();

DeletableGroupResultCollection coordinatorMoved = new DeletableGroupResultCollection();
coordinatorMoved.add(new DeletableGroupResult()
.setGroupId("groupId")
.setErrorCode(Errors.NOT_COORDINATOR.code())
);

env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
new DeleteGroupsResponseData()
.setResults(coordinatorMoved)));
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

coordinatorMoved = new DeletableGroupResultCollection();
coordinatorMoved.add(new DeletableGroupResult()
.setGroupId("groupId")
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
);

env.kafkaClient().prepareResponse(new DeleteGroupsResponse(
new DeleteGroupsResponseData()
.setResults(coordinatorMoved)));
Expand All @@ -3223,9 +3229,9 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
new DeleteGroupsResponseData()
.setResults(validResponse)));

final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds);
errorResult = env.adminClient().deleteConsumerGroups(groupIds);

final KafkaFuture<Void> errorResults = errorResult1.deletedGroups().get("groupId");
final KafkaFuture<Void> errorResults = errorResult.deletedGroups().get("groupId");
assertNull(errorResults.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
Expand Down Expand Up @@ -57,19 +58,20 @@ public void testSuccessfulHandleResponse() {
@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
}

@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
assertRetriable(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
}

@Test
public void testFailedHandleResponse() {
assertFailed(GroupAuthorizationException.class, handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
assertFailed(GroupIdNotFoundException.class, handleWithError(Errors.GROUP_ID_NOT_FOUND));
assertFailed(InvalidGroupIdException.class, handleWithError(Errors.INVALID_GROUP_ID));
assertFailed(GroupNotEmptyException.class, handleWithError(Errors.NON_EMPTY_GROUP));
}

private DeleteGroupsResponse buildResponse(Errors error) {
Expand Down