KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot#10289
Conversation
|
Hello, @dajac , PTAL. |
jsancio
left a comment
There was a problem hiding this comment.
Thanks for the PR. One quick comment. I'll look at the rest of the PR later this week.
There was a problem hiding this comment.
It is sad that we have to add KafkaRaftClient to this list. Do you know what exactly pushed this over the threshold? This would allow us to look into ways to re-organize the code so that it is not so complex.
There was a problem hiding this comment.
handleVoteRequest() method has too many if condition.
There was a problem hiding this comment.
I agree it is unfortunate. There are probably ways we can improve this. For example, this logic smells a little bit:
if (quorum.isLeader()) {
logger.debug("Rejecting vote request {} with epoch {} since we are already leader on that epoch",
request, candidateEpoch);
voteGranted = false;
} else if (quorum.isCandidate()) {
logger.debug("Rejecting vote request {} with epoch {} since we are already candidate on that epoch",
request, candidateEpoch);
voteGranted = false;
} else if (quorum.isResigned()) {
logger.debug("Rejecting vote request {} with epoch {} since we have resigned as candidate/leader in this epoch",
request, candidateEpoch);
voteGranted = false;
} else if (quorum.isFollower()) {
FollowerState state = quorum.followerStateOrThrow();
logger.debug("Rejecting vote request {} with epoch {} since we already have a leader {} on that epoch",
request, candidateEpoch, state.leaderId());
voteGranted = false;It might be possible to push this logic into EpochState or at least to use make use of the name() method in the logging. @dengziming would you be interested in following up on this separately?
There was a problem hiding this comment.
Thank you, I will take some time to improve this.
There was a problem hiding this comment.
What should we do if we see this error in a response? It looks like it would hit handleUnexpectedError currently which just logs an error. That might be ok for now. I think there is a window during startup when we could consider these errors to be fatal. This would be helpful detecting configuration problems. We probably do not want them to be fatal in all cases though because that might result in a misconfigured node killing a stable cluster.
There was a problem hiding this comment.
It's a bit difficult to figure out how to add the window, we could not simply rely on a fixed configuration, I add a ticket to track this problem: https://issues.apache.org/jira/browse/KAFKA-12465.
There was a problem hiding this comment.
We can implement that when handling a response, invalid cluster id are fatal unless a previous response contained a valid cluster id.
There was a problem hiding this comment.
@jsancio This is simple but not very perfect, consider a four-node cluster A-0(clusterId=A) A-1(clusterId=A) B-0(clusterId=B) B-1(clusterId=B), when starting, they all become candidate and send vote request to other nodes, if they all receive vote response from a node with the same clusterId to itself then they will all live, but if all receive vote response from a node with a different clusterId they will all be killed. It seems that the logic is similar to leader-election which should reach a consensus. So we'd better treat them as non-fatal currently and have some discussion to reach a consensus about wheater we should treat this as fatal.
There was a problem hiding this comment.
Yes. @dengziming in that example, the user has incorrectly configured the cluster. The user was configured it so that all of the controllers have each other's listener (connection) information but the cluster ids are different.
The question is do we want to catch those misconfiguration early by shutting down the brokers/controllers? Or do we want to continue executing with the user potentially missing that the controllers/brokers are incorrectly configuration?
There have been conversation of having the first controller leader generate the cluster id and replicate that information to all off the nodes. The currently implementation generate the cluster id in the StorateTool which the user has to run when configuring the controllers.
I am okay leaving it as is and addressing this in a future PR.
80382b3 to
22f0f76
Compare
There was a problem hiding this comment.
I think a better way to do this is to modify validateVoterOnlyRequest and validateLeaderOnlyRequest so that we pass the clusterId. Then we can get rid of getClusterId.
There was a problem hiding this comment.
I tried this approach, it seems that voter and leader is partition level terminology so that validateVoterOnlyRequest is used to get a partition level error but cluster validate is a request level error, we'd better separate these 2 errors since we are making way for multi-raft. I changed the getClusterId method to pass the clusterId to it directly, WDYT.
22f0f76 to
b01a7c2
Compare
b01a7c2 to
ef799af
Compare
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the patch!
More detailed description of your change
This pr follows up #10129 which add clusterId validation to FetchRequest.
Summary of testing strategy (including rationale)
Unit test.
Committer Checklist (excluded from commit message)