KAFKA-9823: Remember the sent generation for the coordinator request#8445
KAFKA-9823: Remember the sent generation for the coordinator request#8445guozhangwang merged 15 commits intoapache:trunkfrom
Conversation
…igger-rebalance-with-static-member-after-sync
…igger-rebalance-with-static-member-after-sync
|
cc @abbccdda @hachikuji for reviews. |
| log.info("OffsetCommit failed with {} due to group instance id {} fenced", sentGeneration, rebalanceConfig.groupInstanceId); | ||
|
|
||
| // if the generation has changed, do not raise the fatal error but rebalance-in-progress | ||
| if (generationUnchanged()) { |
There was a problem hiding this comment.
Why could we still survive from a fenced instance id in commit request?
There was a problem hiding this comment.
No we could not "survive", the point of throwing RebalanceInProgress is not to survive, but to give a different error case to the caller that it do not necessarily have to exit as zombie but could retry.
| log.info("Attempt to heartbeat failed since member id {} is not valid.", sentGeneration.memberId); | ||
| resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error); | ||
| future.raise(error); | ||
| if (generationUnchanged()) { |
There was a problem hiding this comment.
In other response handlers, we combine UNKNOWN_MEMBER_ID and ILLEGAL_GENERATION cases. Do you think we could also consolidate here as well?
There was a problem hiding this comment.
Yes, I agree we should try to consolidate. Looking at this handler alone, we have basically the same code for ILLEGAL_GENERATION, UNKNOWN_MEMBER_ID, and FENCED_INSTANCE_ID.
There was a problem hiding this comment.
SG, let me try it out.
| } | ||
|
|
||
| @Test | ||
| public void testJoinGroupUnknownMemberResponseWithOldGeneration() { |
There was a problem hiding this comment.
Could we extract the group setup logics until the coordinator.ensureActiveGroup();, they seem to be redundant to repeat a couple of times.
There was a problem hiding this comment.
Good point, will try it out.
| sentGeneration, rebalanceConfig.groupInstanceId); | ||
| future.raise(error); | ||
| } else { | ||
| // if the generation has changed, then ignore this error |
There was a problem hiding this comment.
Not sure why we ignore fenced errors here but not in the JoinGroup/SyncGroup handlers.
There was a problem hiding this comment.
As explained in the PR description, I intentionally make the logic differently: the rationale is that for join / sync we should only have one request in-flight at a given time, and at that time if the generation has changed it should be from the heartbeat error handling that resets it; for this case this client should have not re-joined and replaced its member.id, so if the error returns it still indicates another member with the same instance.id has replaced the member.id.
For heartbeat though, we've observed it is possible that, 1) heartbeat sent, 2) the same member rejoins group, gets a new member.id, 3) the previous heartbeat is handled and the error set. In this case we should not treat it as fatal.
| log.info("Attempt to heartbeat failed since member id {} is not valid.", sentGeneration.memberId); | ||
| resetGenerationOnResponseError(ApiKeys.HEARTBEAT, error); | ||
| future.raise(error); | ||
| if (generationUnchanged()) { |
There was a problem hiding this comment.
Yes, I agree we should try to consolidate. Looking at this handler alone, we have basically the same code for ILLEGAL_GENERATION, UNKNOWN_MEMBER_ID, and FENCED_INSTANCE_ID.
| return generation != Generation.NO_GENERATION && generation.hasMemberId(); | ||
| } | ||
|
|
||
| final void setNewGeneration(final Generation generation) { |
There was a problem hiding this comment.
Should we make these methods synchronized?
There was a problem hiding this comment.
These functions are only used in non-integration unit tests so I think it is not necessary. LMK if you have a strong motivation?
There was a problem hiding this comment.
Just because they are exposed. If they ultimately got used in unit tests involving concurrent threads, we would need them to be synchronized. Seems there's no downside to being on the safe side?
There was a problem hiding this comment.
Yup I think so, let me synchronize this then :)
|
@hachikuji @abbccdda comments replied / addressed. Please take another look. |
|
@hachikuji updated again, also tightened some flakiness I found in the newly added unit tests. |
kkonstantine
left a comment
There was a problem hiding this comment.
I have a couple of questions in a couple of points that caught my eye. Mainly for my education, but also to make sure we won't suppress re-joins on errors and logs will represent the exact behavior.
| log.debug("SyncGroup failed: {}", error.message()); | ||
| resetGenerationOnResponseError(ApiKeys.SYNC_GROUP, error); | ||
| log.info("SyncGroup failed with {}: {}, would request re-join", sentGeneration, error.message()); | ||
| if (generationUnchanged()) |
There was a problem hiding this comment.
Hey @guozhangwang.
Trying to understand if any of these changes have implications for the WorkerCoordinator too.
What I've observed is that if the WorkerCoordinator fails to receive a valid SyncGroup request, it may miss several generations until it succeeds again. Is the branch here safe? resetGenerationOnResponseError will be called (and therefore rejoin will be requested) only if generationUnchanged returns true. Is there any risk that we suppress any useful retries to rejoin?
There was a problem hiding this comment.
The rationale is that if the generation has changed, it is either reset by the heartbeat thread in which case the generation is reset and rejoin is already requested, or it is changed by another join-group request; but since inside AbstractCoordinator we will only have one in-flight request at a given time the second scenario should not happen. So the only possibility is the heartbeat resetting.
| || error == Errors.ILLEGAL_GENERATION) { | ||
| log.debug("SyncGroup failed: {}", error.message()); | ||
| resetGenerationOnResponseError(ApiKeys.SYNC_GROUP, error); | ||
| log.info("SyncGroup failed with {}: {}, would request re-join", sentGeneration, error.message()); |
There was a problem hiding this comment.
nit: would request re-join or will request re-join?
More importantly the log message is outside of the if branch below. But if generationUnchanged() is false, then re-join won't be requested, at least not by calling resetGenerationOnResponseError -> resetGeneration -> rejoinNeeded = true;. a) is that safe (as I ask below) b) is the log message printed accurate if there's a chance that re-join is not requested (accuracy of log messages is very crucial here, because usually that's all we have when troubleshooting rebalances).
There was a problem hiding this comment.
Note on the caller joinGroupIfNeeded, when the future fails (here, with either illegal generation and unknown member) then the the join-group would be resent. The only thing we skip is the resetting of the generation since it has already changed.
There was a problem hiding this comment.
Not sure how this comment was changed, but this still doesn't read well for me.
Still seems that the message should say ... will request re-join"
| future.raise(error); | ||
| } else if (error == Errors.ILLEGAL_GENERATION || | ||
| error == Errors.UNKNOWN_MEMBER_ID || | ||
| error == Errors.FENCED_INSTANCE_ID) { |
There was a problem hiding this comment.
If the consumer has been legitimately fenced, is it safe to rejoin the group after resetting the member id? Would that not lead to a ping-pong scenario?
There was a problem hiding this comment.
Note since we set the InstanceFenced error in future, and hence in the caller:
final RuntimeException exception = future.exception();
log.info("Join group failed with {}", exception.toString());
resetJoinGroupFuture();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException ||
exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
We would throw that retriable exception still.
| log.error("Received fatal exception: group.instance.id gets fenced"); | ||
| // for join-group request, even if the generation has changed we would not expect the instance id | ||
| // gets fenced, and hence we always treat this as a fatal error | ||
| log.error("Attempt to join group failed due to group instance id {} gets fenced with {}", |
There was a problem hiding this comment.
nit: can fix the grammar a little
Attempt to join group with generation {} failed because the group instance id {} has been fenced by another instance.
Similarly for a couple below
| future.complete(null); | ||
| } else { | ||
| log.error("LeaveGroup request failed with error: {}", error.message()); | ||
| log.error("LeaveGroup response with {} failed with error: {}", sentGeneration, error.message()); |
There was a problem hiding this comment.
nit: seems the original was a little more accurate? The response did not fail; it just contained an error indicating the request had failed.
| error == Errors.UNKNOWN_MEMBER_ID || | ||
| error == Errors.FENCED_INSTANCE_ID) { | ||
| if (generationUnchanged()) { | ||
| log.info("Attempt to heartbeat failed with generation {} and group instance id {} due to {}, resetting generation", |
There was a problem hiding this comment.
nit: Could we just use the error code in this message? Otherwise we get weird messages like this
Attempt to heartbeat failed with generation .. due to The coordinator is not aware of this member, resetting generation.
|
retest this please |
1 similar comment
|
retest this please |
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Left a nit on the test cases.
|
|
||
| coordinator.ensureActiveGroup(); | ||
| TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 2000, | ||
| "The join-group request was not sent in time after"); |
There was a problem hiding this comment.
nit: after.. what? I think you can drop "in time after." Here is the assertion that is used:
assertThat("Condition not met within timeout " + maxWaitMs + ". " + conditionDetails,
testCondition.conditionMet());
| resetGenerationOnResponseError(ApiKeys.OFFSET_COMMIT, error); | ||
| future.raise(new CommitFailedException()); | ||
| } else { | ||
| future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " + |
There was a problem hiding this comment.
This seems a bit misleading, because the consumer is not actually participating in an ongoing rebalance (yet)
There was a problem hiding this comment.
@guozhangwang can you elaborate on why we don't just throw CommitFailedException here? It seems like it must be the case that the caller is a zombie and should get that exception to alert it, not the RebalanceInProgressException which seems to indicate it might be recoverable
There was a problem hiding this comment.
If the generation has changed since the commit request is sent, then it is likely that it has participated in a new rebalance (and hence get a new generation), or it has reset its generation due to the heartbeat failure. So this commit failure is recoverable.
There was a problem hiding this comment.
Chatted about this offline, will just leave the concrete proposal here:
...
} else {
if (state == MemberState.REBALANCING) {
future.raise(new RebalanceInProgressException(...)
} else {
future.raise(new CommitFailedException(...)
}
}
There was a problem hiding this comment.
The reasoning being that we use CommitFailedException to signal we have dropped out of the group, and RebalanceInProgressException to signal that a rebalance is in progress. There are two cases to consider (within the general case of the generation having changed):
- If the generation is unknown and the state is
STABLEthis means we have dropped out of the group, but haven't yet rejoined and haven't invokedonPartitionsLost--> should throwCommitFailed - If the generation is unknown and the state is
REBALANCINGthis means we dropped out of the group, but have already noticed and rejoined, and already invokedonPartitionsLost--> should throwRebalanceInProgress
Note that if we dropped out of the group and already completed the rejoin, the state will be STABLE again but the generation will also have been set so this case does not apply. Basically, we want to keep CommitFailedException to indicate that the consumer definitely dropped out and will have to rejoin, which is the case in 1. above
…t-for-generated-requests * apache-github/trunk: (366 commits) MINOR: Improve producer test BufferPoolTest#testCloseNotifyWaiters. (apache#7982) MINOR: document how to escape json parameters to ducktape tests (apache#8546) KAFKA-9885; Evict last members of a group when the maximum allowed is reached (apache#8525) KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (apache#8524) KAFKA-9839; Broker should accept control requests with newer broker epoch (apache#8509) KAKFA-9612: Add an option to kafka-configs.sh to add configs from a prop file (KIP-574) MINOR: Partition is under reassignment when adding and removing (apache#8364) MINOR: reduce allocations in log start and recovery checkpoints (apache#8467) MINOR: Remove unused foreign-key join class (apache#8547) HOTFIX: Fix broker bounce system tests (apache#8532) KAFKA-9704: Fix the issue z/OS won't let us resize file when mmap. (apache#8224) KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol (apache#8326) MINOR: equals() should compare all fields for generated classes (apache#8539) KAFKA-9844; Fix race condition which allows more than maximum number of members(apache#8454) KAFKA-9823: Remember the sent generation for the coordinator request (apache#8445) KAFKA-9883: Add better error message when REST API forwards a request and leader is not known (apache#8536) KAFKA-9907: Switch default build to Scala 2.13 (apache#8537) MINOR: Some html fixes in Streams DSL documentation (apache#8503) MINOR: Enable fatal warnings with scala 2.13 (apache#8429) KAFKA-9852: Change the max duration that calls to the buffer pool can block from 2000ms to 10ms to reduce overall test runtime (apache#8464) ...
For join / sync / commit / heartbeat request, we would remember the sent generation in the created handler object, and then upon getting the error code, we could check whether the sent generation still matches the current generation. If not, it means that the member has already reset its generation or has participated in a new rebalance already. This means:
For join / sync-group request, we do not need to call reset-generation any more for illegal-generation / unknown-member. But we would still set the error since at a given time only one join/sync round-trip would be in flight, and hence we should not be participating in a new rebalance. Also for fenced instance error we still treat it as fatal since we should not be participating in a new rebalance, so this is still not expected.
For commit request, we do not set the corresponding error for illegal-generation / unknown-member / fenced-instance but raise rebalance-in-progress. For commit-sync it would be still thrown to user, while for commit-async it would be logged and swallowed.
For heartbeat request, we do not treat illegal-generation / unknown-member / fenced-instance errors and just consider it as succeeded since this should be a stale heartbeat which can be ignored.
Committer Checklist (excluded from commit message)