Skip to content

KAFKA-13033: COORDINATOR_NOT_AVAILABLE should be unmapped#10973

Closed
showuon wants to merge 6 commits intoapache:trunkfrom
showuon:KAFKA-13033
Closed

KAFKA-13033: COORDINATOR_NOT_AVAILABLE should be unmapped#10973
showuon wants to merge 6 commits intoapache:trunkfrom
showuon:KAFKA-13033

Conversation

@showuon
Copy link
Copy Markdown
Member

@showuon showuon commented Jul 4, 2021

In KIP-699, we add some handler to handle different types of operation. In the handleError, we didn't make the COORDINATOR_NOT_AVAILABLE as unmapped, to do a re-lookup. In DescribeTransactionsHandler, there's already explained by Jason Gustafson why COORDINATOR_NOT_AVAILABLE and NOT_COORDINATOR should be listed in unmapped, and COORDINATOR_LOAD_IN_PROGRESS should not.

case COORDINATOR_LOAD_IN_PROGRESS:
    // If the coordinator is in the middle of loading, then we just need to retry
    log.debug("DescribeTransactions request for transactionalId `{}` failed because the " +
            "coordinator is still in the process of loading state. Will retry",
        transactionalIdKey.idValue);
    break;

case NOT_COORDINATOR:
case COORDINATOR_NOT_AVAILABLE:
    // If the coordinator is unavailable or there was a coordinator change, then we unmap
    // the key so that we retry the `FindCoordinator` request
    unmapped.add(transactionalIdKey);
    log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will attempt " +
            "to find the coordinator again and retry", transactionalIdKey.idValue, error);
    break;

We should be consistent with it. Fix it, add logs and comments, and also update the tests.

The changed handler list:

AlterConsumerGroupOffsetsHandler
DeleteConsumerGroupOffsetsHandler
DeleteConsumerGroupsHandler
DescribeConsumerGroupsHandler
ListConsumerGroupOffsetsHandler
RemoveMembersFromConsumerGroupHandler

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jul 4, 2021

@skaundinya15 @mimaison @dajac , could you please check the PR? Thanks.

@dajac
Copy link
Copy Markdown
Member

dajac commented Jul 4, 2021

@showuon Thanks. Good catch! I will review it tomorrow.

@mimaison
Copy link
Copy Markdown
Member

mimaison commented Jul 4, 2021

Yes we should handle errors consistantly but as part of KIP-699, I ensured all existing KafkaAdminClientTest tests kept working to avoid changing behaviour, see discussion in #10743 (comment). I opened https://issues.apache.org/jira/browse/KAFKA-13012 to address it.

This change makes sense but you'll also need to update the *RetriableErrors() tests in KafkaAdminClientTest.

@showuon showuon force-pushed the KAFKA-13033 branch 2 times, most recently from 5977676 to eee72e7 Compare July 5, 2021 07:18
@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jul 5, 2021

@mimaison , thanks for the comments. I've updated the KafkaAdminClientTest. Please take a look again. Thanks.

Failed tests are unrelated.

    Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
    Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
    Build / JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
    Build / JDK 11 and Scala 2.13 / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()

Copy link
Copy Markdown
Contributor

@skaundinya15 skaundinya15 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @showuon, left a couple of comments. Overall the patch makes sense to me. It would be good if we can get this fix into 3.0 too. Also, it looks like there are some conflicts in KafkaAdminClientTest.java, probably best to rebase against trunk.

break;
case COORDINATOR_LOAD_IN_PROGRESS:
// If the coordinator is in the middle of loading, then we just need to retry
log.debug("OffsetCommit request for group {} failed because the coordinator" +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the apiName function for the request in the logs? Would be good to change it here and in all the other handler classes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! Updated!

log.debug("OffsetCommit request for group {} failed because the coordinator" +
" is still in the process of loading state. Will retry. Will retry", groupId);
break;
case COORDINATOR_NOT_AVAILABLE:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, we also retry if we get a COORDINATOR_NOT_AVAILABLE exception. Can we add a DEBUG level log statement stating this and that we will retry? Will be good to mimic what we do for the other retriable errors for COORDINATOR_NOT_AVAILABLE errors.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we'll retry when COORDINATOR_NOT_AVAILABLE error. And we already log it below:

            case COORDINATOR_NOT_AVAILABLE:
            case NOT_COORDINATOR:
                // If the coordinator is unavailable or there was a coordinator change, then we unmap
                // the key so that we retry the `FindCoordinator` request
                log.debug("`{}` request for group {} returned error {}. " +
                    "Will attempt to find the coordinator again and retry.", apiName(), groupId, error);
                unmapped.add(groupId);
                break;

We used a general way (with error variable), to log when either COORDINATOR_NOT_AVAILABLE or NOT_COORDINATOR error happened.

I think it should be fine unless you have other suggestion. Thank you. :)

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jul 8, 2021

Failed tests are unrelated:

    Build / ARM / kafka.raft.KafkaMetadataLogTest.testDeleteSnapshots()
    Build / ARM / kafka.raft.KafkaMetadataLogTest.testDeleteSnapshots()
    Build / JDK 16 and Scala 2.13 / kafka.api.ConsumerBounceTest.testCloseDuringRebalance()
    Build / JDK 16 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
    Build / JDK 16 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
    Build / JDK 16 and Scala 2.13 / org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable
    Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsBounceTest.testWithGroupId()
    Build / JDK 11 and Scala 2.13 / kafka.api.TransactionsTest.testAbortTransactionTimeout()
    Build / JDK 11 and Scala 2.13 / kafka.network.ConnectionQuotasTest.testNonDefaultConnectionCountLimitAndRateLimit()
    Build / JDK 11 and Scala 2.13 / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
    Build / JDK 8 and Scala 2.12 / kafka.raft.KafkaMetadataLogTest.testDeleteSnapshots()
    Build / JDK 8 and Scala 2.12 / kafka.raft.KafkaMetadataLogTest.testDeleteSnapshots()

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jul 8, 2021

@skaundinya15 , please take a look again when available. Thank you.

final String unexpectedErrorMsg = String.format("Received unexpected error for group %s in `%s` response",
groupId, apiName());
log.error(unexpectedErrorMsg, error.exception());
failed.put(groupId, error.exception(unexpectedErrorMsg));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling bugs me a bit. failed will result in failing the future eventually, right? The issue is that we received errors either for the group and for the partition in the response and we don't really differentiate them here. For instance, imagine that a partition has an UNKNOWN_TOPIC_OR_PARTITION error. Putting it to failed with the groupId could fail the future and thus results in providing that error for all partitions. This is not correct.

It seems to me that we should differentiate the group level errors from the partition level errors here or we should consider all of them as partition level errors. What do you think?

Also, I think that we should handle all the expected errors here. The default error message here is wrong. There are many errors which expect but which are not handled.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac , thanks for your comment.

It seems to me that we should differentiate the group level errors from the partition level errors here or we should consider all of them as partition level errors. What do you think?

I agree with you. I think what KIP-699 did, is trying to not break existing tests. It indeed needs improvement.

Also, I think that we should handle all the expected errors here. The default error message here is wrong. There are many errors which expect but which are not handled.

You're right.

I'm thinking we can handle them in separate PR, and open a Jira ticket to track it. And due to V3.0 is released, maybe that improvement can go into next release. What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon I actually believe that we have regressed here. The admin api returns wrong results. I just tried with a small unit test:

    @Test
    public void testOffsetCommitErrors() throws Exception {
        final Cluster cluster = mockCluster(3, 0);
        final Time time = new MockTime();

        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
            AdminClientConfig.RETRIES_CONFIG, "0")) {
            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

            final TopicPartition foo0 = new TopicPartition("foo", 0);
            final TopicPartition foo1 = new TopicPartition("foo", 1);

            env.kafkaClient().prepareResponse(
                prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

            Map<TopicPartition, Errors> responseData = new HashMap<>();
            responseData.put(foo0, Errors.NONE);
            responseData.put(foo1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
            env.kafkaClient().prepareResponse(new OffsetCommitResponse(0, responseData));

            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            offsets.put(foo0, new OffsetAndMetadata(123L));
            offsets.put(foo1, new OffsetAndMetadata(456L));
            final AlterConsumerGroupOffsetsResult result = env.adminClient()
                .alterConsumerGroupOffsets("group", offsets);

            assertNull(result.partitionResult(foo0).get());
            TestUtils.assertFutureError(result.partitionResult(foo1), UnknownTopicOrPartitionException.class);

            TestUtils.assertFutureError(result.all(), UnknownTopicOrPartitionException.class);
        }
    }

It works with 2.8 but fails with trunk. In trunk, foo0 is also failed with UnknownTopicOrPartitionException. This is because we don't handle the partition level correctly. Could you double check if I am correct here? If you confirm, I will raise a blocker for 3.0.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have opened a PR for this here: #11016.

@dajac
Copy link
Copy Markdown
Member

dajac commented Jul 10, 2021

@showuon Thanks for the patch. I have started to look at it. I have left a comment which is not really related to your PR. I think that we got the error handling wrong in some place in KIP-699.

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jul 11, 2021

@dajac , thanks for letting me know. I tested it today, and confirmed that it's a regression bug. I also confirmed that there's no regression bug in DeleteConsumerGroupOffsetsHandler. However, I agree with you, we should

  1. differentiate the group level errors from the partition level errors
  2. handle all expected errors well.

So, I checked, and update the DeleteConsumerGroupOffsetsHandler and DeleteConsumerGroupsHandler. AlterConsumerGroupOffsetsHandler will get handled in your PR, so keep it as is. Others are good. Thank you.

log.debug("`{}` request for group {} returned error {} in the partition {}.",
apiName(), groupId, partitionError, tp);
}
partitions.put(tp, partitionError);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For partition level error, we don't handle it, just put them into completion result.

break;
case INVALID_GROUP_ID:
case NON_EMPTY_GROUP:
case GROUP_ID_NOT_FOUND:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle all possible errors well.

@dajac
Copy link
Copy Markdown
Member

dajac commented Jul 11, 2021

@showuon I wonder if we could split up this PR by handler. We've already have few regressions related to KIP-699 changes so I'd like to be extra careful and review them all separately. Also, I would avoid changing the transaction handlers for now as they are fine. We could follow up with a PR for the cosmetic changes afterwards. What do you think?

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jul 11, 2021

I wonder if we could split up this PR by handler.

Sounds good to me. I can split them tomorrow(my time). There are 6 handlers added in KIP-699, I'll split into 5 PR, and plus #11016 , that'll cover all changes. Thank you.

@dajac
Copy link
Copy Markdown
Member

dajac commented Jul 11, 2021

Sounds good, thanks! I will review them directly. Let's get them in 3.0.

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jul 12, 2021

@dajac I've created 4 PRs. The last one will be created tomorrow.

In addition to making the coordinator error handling consistent, I also make sure the handle response logic is the same as before. And add test coverage. Thank you.

@dajac
Copy link
Copy Markdown
Member

dajac commented Jul 12, 2021

@showuon Thanks. I will start reviewing them tomorrow.

@showuon
Copy link
Copy Markdown
Member Author

showuon commented Jul 13, 2021

I've split this PR into 5 sub-PRs:

#11019 for DeleteConsumerGroupOffsetsHandler
#11021 for DeleteConsumerGroupsHandler
#11022 for DescribeConsumerGroupsHandler
#11026 for ListConsumerGroupOffsetsHandler
#11035 for RemoveMembersFromConsumerGroupHandler

Please help take a look. Thanks.

Also, I'm going to close this PR if no other opinions.

Thank you.

@showuon showuon closed this Jul 15, 2021
@mimaison
Copy link
Copy Markdown
Member

Sorry I was away for a bit. Thanks @showuon and @dajac for following up on KIP-699!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants