Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(
List<CoordinatorKey> unmapped = new ArrayList<>();

Map<TopicPartition, Errors> partitions = new HashMap<>();
int totalPartitionCount = 0;
for (OffsetCommitResponseTopic topic : response.data().topics()) {
for (OffsetCommitResponsePartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
Expand All @@ -120,9 +121,14 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(
} else {
partitions.put(tp, error);
}
totalPartitionCount++;
}
}
if (failed.isEmpty() && unmapped.isEmpty())
// only complete this request when:
// 1. no fail
// 2. no unmapped
// 3. all partitions are handled (i.e. no need to retry)
if (failed.isEmpty() && unmapped.isEmpty() && partitions.size() == totalPartitionCount)
completed.put(groupId, partitions);

return new ApiResult<>(completed, failed, unmapped);
Expand All @@ -136,21 +142,28 @@ private void handleError(
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.error("Received authorization failure for group {} in `OffsetCommit` response", groupId,
error.exception());
log.error("Received authorization failure for group {} in `{}` response", groupId,
apiName(), error.exception());
failed.put(groupId, error.exception());
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`{}` request for group {} failed because the coordinator" +
" is still in the process of loading state. Will retry.", apiName(), groupId);
break;
case COORDINATOR_NOT_AVAILABLE:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, we also retry if we get a COORDINATOR_NOT_AVAILABLE exception. Can we add a DEBUG level log statement stating this and that we will retry? Will be good to mimic what we do for the other retriable errors for COORDINATOR_NOT_AVAILABLE errors.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll retry when COORDINATOR_NOT_AVAILABLE error. And we already log it below:

            case COORDINATOR_NOT_AVAILABLE:
            case NOT_COORDINATOR:
                // If the coordinator is unavailable or there was a coordinator change, then we unmap
                // the key so that we retry the `FindCoordinator` request
                log.debug("`{}` request for group {} returned error {}. " +
                    "Will attempt to find the coordinator again and retry.", apiName(), groupId, error);
                unmapped.add(groupId);
                break;

We used a general way (with error variable), to log when either COORDINATOR_NOT_AVAILABLE or NOT_COORDINATOR error happened.

I think it should be fine unless you have other suggestion. Thank you. :)

case NOT_COORDINATOR:
log.debug("OffsetCommit request for group {} returned error {}. Will retry", groupId, error);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`{}` request for group {} returned error {}. " +
"Will attempt to find the coordinator again and retry.", apiName(), groupId, error);
unmapped.add(groupId);
break;
default:
log.error("Received unexpected error for group {} in `OffsetCommit` response",
groupId, error.exception());
failed.put(groupId, error.exception(
"Received unexpected error for group " + groupId + " in `OffsetCommit` response"));
final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
groupId, apiName());
log.error(unexpectedErrorMsg, error.exception());
failed.put(groupId, error.exception(unexpectedErrorMsg));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling bugs me a bit. failed will result in failing the future eventually, right? The issue is that we received errors either for the group and for the partition in the response and we don't really differentiate them here. For instance, imagine that a partition has an UNKNOWN_TOPIC_OR_PARTITION error. Putting it to failed with the groupId could fail the future and thus results in providing that error for all partitions. This is not correct.

It seems to me that we should differentiate the group level errors from the partition level errors here or we should consider all of them as partition level errors. What do you think?

Also, I think that we should handle all the expected errors here. The default error message here is wrong. There are many errors which expect but which are not handled.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac , thanks for your comment.

It seems to me that we should differentiate the group level errors from the partition level errors here or we should consider all of them as partition level errors. What do you think?

I agree with you. I think what KIP-699 did, is trying to not break existing tests. It indeed needs improvement.

Also, I think that we should handle all the expected errors here. The default error message here is wrong. There are many errors which expect but which are not handled.

You're right.

I'm thinking we can handle them in separate PR, and open a Jira ticket to track it. And due to V3.0 is released, maybe that improvement can go into next release. What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon I actually believe that we have regressed here. The admin api returns wrong results. I just tried with a small unit test:

    @Test
    public void testOffsetCommitErrors() throws Exception {
        final Cluster cluster = mockCluster(3, 0);
        final Time time = new MockTime();

        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
            AdminClientConfig.RETRIES_CONFIG, "0")) {
            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

            final TopicPartition foo0 = new TopicPartition("foo", 0);
            final TopicPartition foo1 = new TopicPartition("foo", 1);

            env.kafkaClient().prepareResponse(
                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

            Map<TopicPartition, Errors> responseData = new HashMap<>();
            responseData.put(foo0, Errors.NONE);
            responseData.put(foo1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
            env.kafkaClient().prepareResponse(new OffsetCommitResponse(0, responseData));

            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            offsets.put(foo0, new OffsetAndMetadata(123L));
            offsets.put(foo1, new OffsetAndMetadata(456L));
            final AlterConsumerGroupOffsetsResult result = env.adminClient()
                .alterConsumerGroupOffsets("group", offsets);

            assertNull(result.partitionResult(foo0).get());
            TestUtils.assertFutureError(result.partitionResult(foo1), UnknownTopicOrPartitionException.class);

            TestUtils.assertFutureError(result.all(), UnknownTopicOrPartitionException.class);
        }
    }

It works with 2.8 but fails with trunk. In trunk, foo0 is also failed with UnknownTopicOrPartitionException. This is because we don't handle the partition level correctly. Could you double check if I am correct here? If you confirm, I will raise a blocker for 3.0.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have opened a PR for this here: #11016.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,18 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(

final Errors error = Errors.forCode(response.data().errorCode());
if (error != Errors.NONE) {
handleError(groupId, error, failed, unmapped);
handleGroupLevelError(groupId, error, failed, unmapped);
} else {
final Map<TopicPartition, Errors> partitions = new HashMap<>();
response.data().topics().forEach(topic ->
topic.partitions().forEach(partition -> {
Errors partitionError = Errors.forCode(partition.errorCode());
if (!handleError(groupId, partitionError, failed, unmapped)) {
partitions.put(new TopicPartition(topic.name(), partition.partitionIndex()), partitionError);
topic.partitions().forEach(partitionOffsetDeleteResponse -> {
Errors partitionError = Errors.forCode(partitionOffsetDeleteResponse.errorCode());
TopicPartition tp = new TopicPartition(topic.name(), partitionOffsetDeleteResponse.partitionIndex());
if (log.isDebugEnabled() && partitionError != Errors.NONE) {
log.debug("`{}` request for group {} returned error {} in the partition {}.",
apiName(), groupId, partitionError, tp);
}
partitions.put(tp, partitionError);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For partition level error, we don't handle it, just put them into completion result.

})
);
if (!partitions.isEmpty())
Expand All @@ -121,7 +124,7 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, Errors>> handleResponse(
return new ApiResult<>(completed, failed, unmapped);
}

private boolean handleError(
private void handleGroupLevelError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
Expand All @@ -131,20 +134,29 @@ private boolean handleError(
case GROUP_AUTHORIZATION_FAILED:
case GROUP_ID_NOT_FOUND:
case INVALID_GROUP_ID:
log.error("Received non retriable error for group {} in `DeleteConsumerGroupOffsets` response", groupId,
error.exception());
case NON_EMPTY_GROUP:
log.error("Received non retriable error for group {} in `{}` response", groupId,
apiName(), error.exception());
failed.put(groupId, error.exception());
return true;
break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`{}` request for group {} failed because the coordinator" +
" is still in the process of loading state. Will retry.", apiName(), groupId);
break;
case COORDINATOR_NOT_AVAILABLE:
return true;
case NOT_COORDINATOR:
log.debug("DeleteConsumerGroupOffsets request for group {} returned error {}. Will retry",
groupId, error);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`{}` request for group {} returned error {}. " +
"Will attempt to find the coordinator again and retry.", apiName(), groupId, error);
unmapped.add(groupId);
return true;
break;
default:
return false;
final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
groupId, apiName());
log.error(unexpectedErrorMsg, error.exception());
failed.put(groupId, error.exception());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,34 @@ private void handleError(
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.error("Received authorization failure for group {} in `DeleteConsumerGroups` response", groupId,
error.exception());
log.error("Received authorization failure for group {} in `{}` response", groupId,
apiName(), error.exception());
failed.put(groupId, error.exception());
break;
case INVALID_GROUP_ID:
case NON_EMPTY_GROUP:
case GROUP_ID_NOT_FOUND:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle all possible errors well.

log.error("Received non retriable failure for group {} in `{}` response", groupId,
apiName(), error.exception());
failed.put(groupId, error.exception());
break;
case COORDINATOR_LOAD_IN_PROGRESS:
case COORDINATOR_NOT_AVAILABLE:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`{}` request for group {} failed because the coordinator " +
"is still in the process of loading state. Will retry", apiName(), groupId);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("DeleteConsumerGroups request for group {} returned error {}. Will retry",
groupId, error);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`{}` request for group {} returned error {}. " +
"Will attempt to find the coordinator again and retry", apiName(), groupId, error);
unmapped.add(groupId);
break;
default:
log.error("Received unexpected error for group {} in `DeleteConsumerGroups` response",
groupId, error.exception());
final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
groupId, apiName());
log.error(unexpectedErrorMsg, error.exception());
failed.put(groupId, error.exception());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,28 @@ private void handleError(
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.error("Received authorization failure for group {} in `DescribeGroups` response", groupId,
error.exception());
log.error("Received authorization failure for group {} in `{}` response", groupId,
apiName(), error.exception());
failed.put(groupId, error.exception());
break;
case COORDINATOR_LOAD_IN_PROGRESS:
case COORDINATOR_NOT_AVAILABLE:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`{}` request for group {} failed because the coordinator " +
"is still in the process of loading state. Will retry", apiName(), groupId);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("DescribeGroups request for group {} returned error {}. Will retry",
groupId, error);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`{}` request for group {} returned error {}. " +
"Will attempt to find the coordinator again and retry", apiName(), groupId, error);
unmapped.add(groupId);
break;
default:
log.error("Received unexpected error for group {} in `DescribeGroups` response",
groupId, error.exception());
failed.put(groupId, error.exception(
"Received unexpected error for group " + groupId + " in `DescribeGroups` response"));
final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
groupId, apiName());
log.error(unexpectedErrorMsg, error.exception());
failed.put(groupId, error.exception(unexpectedErrorMsg));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,34 +159,34 @@ private void handleError(
switch (error) {
case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
failed.put(transactionalIdKey, new TransactionalIdAuthorizationException(
"DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
apiName() + " request for transactionalId `" + transactionalIdKey.idValue + "` " +
"failed due to authorization failure"));
break;

case TRANSACTIONAL_ID_NOT_FOUND:
failed.put(transactionalIdKey, new TransactionalIdNotFoundException(
"DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
apiName() + " request for transactionalId `" + transactionalIdKey.idValue + "` " +
"failed because the ID could not be found"));
break;

case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("DescribeTransactions request for transactionalId `{}` failed because the " +
log.debug("{} request for transactionalId `{}` failed because the " +
"coordinator is still in the process of loading state. Will retry",
transactionalIdKey.idValue);
apiName(), transactionalIdKey.idValue);
break;

case NOT_COORDINATOR:
case COORDINATOR_NOT_AVAILABLE:
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
unmapped.add(transactionalIdKey);
log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will attempt " +
"to find the coordinator again and retry", transactionalIdKey.idValue, error);
log.debug("{} request for transactionalId `{}` returned error {}. Will attempt " +
"to find the coordinator again and retry", apiName(), transactionalIdKey.idValue, error);
break;

default:
failed.put(transactionalIdKey, error.exception("DescribeTransactions request for " +
failed.put(transactionalIdKey, error.exception(apiName() + " request for " +
"transactionalId `" + transactionalIdKey.idValue + "` failed due to unexpected error"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,30 @@ private void handleError(
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
log.error("Received authorization failure for group {} in `OffsetFetch` response", groupId,
error.exception());
log.error("Received authorization failure for group {} in `{}` response", groupId,
apiName(), error.exception());
failed.put(groupId, error.exception());
break;

case COORDINATOR_LOAD_IN_PROGRESS:
case COORDINATOR_NOT_AVAILABLE:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("`{}` request for group {} failed because the coordinator " +
"is still in the process of loading state. Will retry", apiName(), groupId);
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
log.debug("OffsetFetch request for group {} returned error {}. Will retry",
groupId, error);
// If the coordinator is unavailable or there was a coordinator change, then we unmap
// the key so that we retry the `FindCoordinator` request
log.debug("`{}` request for group {} returned error {}. " +
"Will attempt to find the coordinator again and retry", apiName(), groupId, error);
unmapped.add(groupId);
break;

default:
log.error("Received unexpected error for group {} in `OffsetFetch` response",
groupId, error.exception());
failed.put(groupId, error.exception(
"Received unexpected error for group " + groupId + " in `OffsetFetch` response"));
final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
groupId, apiName());
log.error(unexpectedErrorMsg, error.exception());
failed.put(groupId, error.exception(unexpectedErrorMsg));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ public ApiResult<AllBrokersStrategy.BrokerKey, Collection<TransactionListing>> h
Errors error = Errors.forCode(response.data().errorCode());

if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
log.debug("The `ListTransactions` request sent to broker {} failed because the " +
"coordinator is still loading state. Will try again after backing off", brokerId);
log.debug("The `{}` request sent to broker {} failed because the " +
"coordinator is still loading state. Will try again after backing off", apiName(), brokerId);
return ApiResult.empty();
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
log.debug("The `ListTransactions` request sent to broker {} failed because the " +
"coordinator is shutting down", brokerId);
return ApiResult.failed(key, new CoordinatorNotAvailableException("ListTransactions " +
"request sent to broker " + brokerId + " failed because the coordinator is shutting down"));
log.debug("The `{}` request sent to broker {} failed because the " +
"coordinator is shutting down", apiName(), brokerId);
return ApiResult.failed(key, new CoordinatorNotAvailableException(apiName() +
" request sent to broker " + brokerId + " failed because the coordinator is shutting down"));
} else if (error != Errors.NONE) {
log.error("The `ListTransactions` request sent to broker {} failed because of an " +
"unexpected error {}", brokerId, error);
return ApiResult.failed(key, error.exception("ListTransactions request " +
log.error("The `{}` request sent to broker {} failed because of an " +
"unexpected error {}", apiName(), brokerId, error);
return ApiResult.failed(key, error.exception(apiName() + " request " +
"sent to broker " + brokerId + " failed with an unexpected exception"));
} else {
List<TransactionListing> listings = response.data().transactionStates().stream()
Expand Down
Loading