KAFKA-12352: Make sure all rejoin group and reset state has a reason#10232
Conversation
guozhangwang
left a comment
There was a problem hiding this comment.
@ableegoldman @vcrfxia for reviews.
| } | ||
| } | ||
| } else { | ||
| requestRejoin(); |
There was a problem hiding this comment.
We can remove this since it is a bit redundant now as we call for each case if necessary.
There was a problem hiding this comment.
Just to clarify, you mean we don't need to rejoin here since we will always raise an error, and always rejoin (if necessary) when checking that error?
Or are you referring to the requestRejoinOnResponseError calls you added to the two last cases in the below if/else?
There was a problem hiding this comment.
I meant the latter: we call that inside the conditions already -- for those fatal errors, we do not need to call this anyways since the consumer will throw and crash.
There was a problem hiding this comment.
@guozhangwang I think something may have been messed up during a merge/rebase: I no longer see requestRejoinOnResponseError being invoked anywhere
There was a problem hiding this comment.
I added that function for sync group handler that handles retriable COORDINATOR_NOT_AVAILABLE / NOT_COORDINATOR and any unexpected error. After the refactoring PR they are not all fall into the joinGroupIfNeeded in
final RuntimeException exception = future.exception();
resetJoinGroupFuture();
if (exception instanceof UnknownMemberIdException ||
exception instanceof IllegalGenerationException ||
exception instanceof RebalanceInProgressException ||
exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception));
timer.sleep(rebalanceConfig.retryBackoffMs);
This is part of the principle I mentioned:
We may reset generationa and request rejoin in two different places: 1) in join/sync-group handler, and 2) in joinGroupIfNeeded, when the future is received. The principle is that these two should not overlap, and 2) is used as a fallback for those common errors from join/sync that we do not handle specifically.
But I forgot to remove this function as part of the second pass; will remove.
There was a problem hiding this comment.
Ok cool, thanks. One last question then: after this refactoring, since we no longer call requestRejoinOnResponseError below, should we re-add the requestRejoin() call here? Or add a requestRejoin to the specific cases in the SyncGroup handler, eg
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
"Sent generation was {}", sentGeneration);
future.raise(error);
}
There was a problem hiding this comment.
I think we do not need to, since it would be called on resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); --- previously we are calling rejoin double times.
There was a problem hiding this comment.
Hmm...but resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); is only called in joinGroupIfNeeded which is only called in ensureActiveGroup, which is in turn only invoked in ConsumerCoordinator#poll.
That said, inside SyncGroupResponseHandler#handle we would already have rejoinNeeded = true and only set it to false if the SyncGroup succeeds. So for that reason I guess we don't need the requestRejoin anywhere inside the SyncGroup handler
| @Override | ||
| public void onFailure(RuntimeException e, RequestFuture<Void> future) { | ||
| log.debug("FindCoordinator request failed due to {}", e); | ||
| log.debug("FindCoordinator request failed due to {}", e.toString()); |
There was a problem hiding this comment.
Minor cleanup, we only need to print the error message but not the stack trace.
| } | ||
|
|
||
| synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) { | ||
| log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api); |
There was a problem hiding this comment.
Note that I intentionally bumped up the log level from debug to info here since I think this is necessarily a message that users should pay attention to in production, where they mostly use INFO. Open for counter suggestions though.
There was a problem hiding this comment.
SGTM. If we find it flooding the logs and not helpful we can reconsider
ableegoldman
left a comment
There was a problem hiding this comment.
Build failed with Execution failed for task ':connect:runtime:compileJava', I guess trunk is broken atm?
| } | ||
| } | ||
| } else { | ||
| requestRejoin(); |
There was a problem hiding this comment.
Just to clarify, you mean we don't need to rejoin here since we will always raise an error, and always rejoin (if necessary) when checking that error?
Or are you referring to the requestRejoinOnResponseError calls you added to the two last cases in the below if/else?
| } | ||
|
|
||
| synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) { | ||
| log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api); |
There was a problem hiding this comment.
SGTM. If we find it flooding the logs and not helpful we can reconsider
| } | ||
|
|
||
| private synchronized void resetState() { | ||
| private synchronized void resetState(final String reason) { |
There was a problem hiding this comment.
nit: rename to resetStateAndGeneration?
| log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}", | ||
| error.message(), sentGeneration); | ||
| markCoordinatorUnknown(error); | ||
| requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error); |
There was a problem hiding this comment.
Why do we explicitly rejoin in this case, but not eg REBALANCE_IN_PROGRESS? or UNKNOWN_MEMBER_ID/ILLEGAL_GENERATION ?
There was a problem hiding this comment.
You're right, we do not, I've updated this section.
…ebalance-trigger-event-logging
…ebalance-trigger-event-logging
|
We may reset generationa and request rejoin in two different places: 1) in join/sync-group handler, and 2) in joinGroupIfNeeded, when the future is received. The principle is that these two should not overlap, and 2) is used as a fallback for those common errors from join/sync that we do not handle specifically. |
ableegoldman
left a comment
There was a problem hiding this comment.
LGTM, thanks for the improvement! Feel free to merge if the build passes
|
Failed with unrelated |
Conflicts: * Jenkinsfile: `install` -> `publishToMavenLocal`, drop ARM build and other changes that don't make sense for Confluent's version of `Jenkinsfile`. * build.gradle: keep Confluent changes for automatic skipping signing for specific version patterns (upstream only does it if the version ends with `SNAPSHOT`). Commits: * apache-github/trunk: (59 commits) MINOR: Remove redundant allows in import-control.xml (apache#10339) MINOR: remove some specifying types in tool command (apache#10329) KAFKA-12455: Fix OffsetValidationTest.test_broker_rolling_bounce failure with Raft (apache#10322) MINOR: Add toString to various Kafka Metrics classes (apache#10330) KAFKA-12330; FetchSessionCache may cause starvation for partitions when FetchResponse is full (apache#10318) KAFKA-12427: Don't update connection idle time for muted connections (apache#10267) MINOR; Various code cleanups (apache#10319) HOTFIX: timeout issue in removeStreamThread() (apache#10321) revert stream logging level back to ERROR (apache#10320) KAFKA-12352: Make sure all rejoin group and reset state has a reason (apache#10232) KAFKA-10348: Share client channel between forwarding and auto creation manager (apache#10135) MINOR: Update year in NOTICE (apache#10308) KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose` (apache#10243) MINOR: Remove redundant inheritance from FilteringJmxReporter #onMetricRemoved (apache#10303) KAFKA-12462: proceed with task revocation in case of thread in PENDING_SHUTDOWN (apache#10311) KAFKA-12460; Do not allow raft truncation below high watermark (apache#10310) MINOR: Log project, gradle, java and scala versions at the start of the build (apache#10307) KAFKA-10357: Add missing repartition topic validation (apache#10305) MINOR: Improve error message in MirrorConnectorsIntegrationBaseTest (apache#10268) MINOR: Add missing unit tests for Mirror Connect (apache#10192) ...
Committer Checklist (excluded from commit message)