Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2289,7 +2289,7 @@ public void enforceRebalance() {
if (coordinator == null) {
throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
}
coordinator.requestRejoin();
coordinator.requestRejoin("rebalance enforced by user");
} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,31 +455,27 @@ boolean joinGroupIfNeeded(final Timer timer) {
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
"the rebalance callback is triggered, marking this rebalance as failed and retry",
generationSnapshot, stateSnapshot);
resetStateAndRejoin();
final String reason = String.format("rebalance failed since the generation/state was " +
"modified by heartbeat thread to %s/%s before the rebalance callback triggered",
generationSnapshot, stateSnapshot);

resetStateAndRejoin(reason);
resetJoinGroupFuture();
}
} else {
final RuntimeException exception = future.exception();

// we do not need to log error for memberId required,
// since it is not really an error and is transient
if (!(exception instanceof MemberIdRequiredException)) {
log.info("Rebalance failed.", exception);
}

resetJoinGroupFuture();

if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException ||
exception instanceof RebalanceInProgressException ||
exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;

resetStateAndRejoin();
resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception));
timer.sleep(rebalanceConfig.retryBackoffMs);
}
}
Expand Down Expand Up @@ -655,6 +651,8 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
}
requestRejoin("need to re-join with the given member-id");

future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("JoinGroup failed due to non-fatal error: REBALANCE_IN_PROGRESS, " +
Expand Down Expand Up @@ -775,8 +773,6 @@ public void handle(SyncGroupResponse syncResponse,
}
}
} else {
requestRejoin();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this since it is a bit redundant now as we call for each case if necessary.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, you mean we don't need to rejoin here since we will always raise an error, and always rejoin (if necessary) when checking that error?

Or are you referring to the requestRejoinOnResponseError calls you added to the two last cases in the below if/else?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the latter: we call that inside the conditions already -- for those fatal errors, we do not need to call this anyways since the consumer will throw and crash.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang I think something may have been messed up during a merge/rebase: I no longer see requestRejoinOnResponseError being invoked anywhere

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added that function for sync group handler that handles retriable COORDINATOR_NOT_AVAILABLE / NOT_COORDINATOR and any unexpected error. After the refactoring PR they are not all fall into the joinGroupIfNeeded in

final RuntimeException exception = future.exception();

                resetJoinGroupFuture();

                if (exception instanceof UnknownMemberIdException ||
                    exception instanceof IllegalGenerationException ||
                    exception instanceof RebalanceInProgressException ||
                    exception instanceof MemberIdRequiredException)
                    continue;
                else if (!future.isRetriable())
                    throw exception;

                resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception));
                timer.sleep(rebalanceConfig.retryBackoffMs);

This is part of the principle I mentioned:

We may reset generationa and request rejoin in two different places: 1) in join/sync-group handler, and 2) in joinGroupIfNeeded, when the future is received. The principle is that these two should not overlap, and 2) is used as a fallback for those common errors from join/sync that we do not handle specifically.

But I forgot to remove this function as part of the second pass; will remove.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok cool, thanks. One last question then: after this refactoring, since we no longer call requestRejoinOnResponseError below, should we re-add the requestRejoin() call here? Or add a requestRejoin to the specific cases in the SyncGroup handler, eg

} else if (error == Errors.REBALANCE_IN_PROGRESS) {
    log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                      "Sent generation was {}", sentGeneration);
    future.raise(error);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not need to, since it would be called on resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); --- previously we are calling rejoin double times.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...but resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); is only called in joinGroupIfNeeded which is only called in ensureActiveGroup, which is in turn only invoked in ConsumerCoordinator#poll.
That said, inside SyncGroupResponseHandler#handle we would already have rejoinNeeded = true and only set it to false if the SyncGroup succeeds. So for that reason I guess we don't need the requestRejoin anywhere inside the SyncGroup handler


if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
Expand Down Expand Up @@ -860,7 +856,7 @@ public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {

@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
log.debug("FindCoordinator request failed due to {}", e);
log.debug("FindCoordinator request failed due to {}", e.toString());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor cleanup, we only need to print the error message but not the stack trace.


if (!(e instanceof RetriableException)) {
// Remember the exception if fatal so we can ensure it gets thrown by the main thread
Expand Down Expand Up @@ -961,29 +957,29 @@ protected synchronized String memberId() {
return generation.memberId;
}

private synchronized void resetState() {
private synchronized void resetStateAndGeneration(final String reason) {
log.info("Resetting generation due to: {}", reason);

state = MemberState.UNJOINED;
generation = Generation.NO_GENERATION;
}

private synchronized void resetStateAndRejoin() {
resetState();
rejoinNeeded = true;
private synchronized void resetStateAndRejoin(final String reason) {
resetStateAndGeneration(reason);
requestRejoin(reason);
}

synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) {
log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I intentionally bumped up the log level from debug to info here since I think this is necessarily a message that users should pay attention to in production, where they mostly use INFO. Open for counter suggestions though.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. If we find it flooding the logs and not helpful we can reconsider


resetStateAndRejoin();
final String reason = String.format("encountered %s from %s response", error, api);
resetStateAndRejoin(reason);
}

synchronized void resetGenerationOnLeaveGroup() {
log.debug("Resetting generation due to consumer pro-actively leaving the group");

resetStateAndRejoin();
resetStateAndRejoin("consumer pro-actively leaving the group");
}

public synchronized void requestRejoin() {
public synchronized void requestRejoin(final String reason) {
log.info("Request joining group due to: {}", reason);
this.rejoinNeeded = true;
}

Expand Down Expand Up @@ -1120,8 +1116,7 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> futu
// since we may be sending the request during rebalance, we should check
// this case and ignore the REBALANCE_IN_PROGRESS error
if (state == MemberState.STABLE) {
log.info("Attempt to heartbeat failed since group is rebalancing");
requestRejoin();
requestRejoin("group is already rebalancing");
future.raise(error);
} else {
log.debug("Ignoring heartbeat response with error {} during {} state", error, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,10 @@ protected void onJoinComplete(int generation,
Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions());

if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " +
"that the subscription has changed since we joined the group. Will try re-join the group with current subscription",
assignment.partitions(), subscriptions.prettyString());

requestRejoin();
final String reason = String.format("received assignment %s does not match the current subscription %s; " +
"it is likely that the subscription has changed since we joined the group, will re-join with current subscription",
assignment.partitions(), subscriptions.prettyString());
requestRejoin(reason);

return;
}
Expand Down Expand Up @@ -408,8 +407,9 @@ protected void onJoinComplete(int generation,
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));

// If revoked any partitions, need to re-join the group afterwards
log.info("Need to revoke partitions {} and re-join the group", revokedPartitions);
requestRejoin();
final String reason = String.format("need to revoke partitions %s as indicated " +
"by the current assignment and re-join", revokedPartitions);
requestRejoin(reason);
}
}

Expand Down Expand Up @@ -769,19 +769,17 @@ public boolean rejoinNeededOrPending() {
// we need to rejoin if we performed the assignment and metadata has changed;
// also for those owned-but-no-longer-existed partitions we should drop them as lost
if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) {
log.info("Requesting to re-join the group and trigger rebalance since the assignment metadata has changed from {} to {}",
assignmentSnapshot, metadataSnapshot);

requestRejoin();
final String reason = String.format("cached metadata has changed from %s at the beginning of the rebalance to %s",
assignmentSnapshot, metadataSnapshot);
requestRejoin(reason);
return true;
}

// we need to join if our subscription has changed since the last join
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
log.info("Requesting to re-join the group and trigger rebalance since the subscription has changed from {} to {}",
final String reason = String.format("subscription has changed from %s at the beginning of the rebalance to %s",
joinedSubscription, subscriptions.subscription());

requestRejoin();
requestRejoin(reason);
return true;
}

Expand Down Expand Up @@ -1236,7 +1234,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> futu
* request re-join but do not reset generations. If the callers decide to retry they
* can go ahead and call poll to finish up the rebalance first, and then try commit again.
*/
requestRejoin();
requestRejoin("offset commit failed since group is already rebalancing");
future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " +
"consumer group is executing a rebalance at the moment. You can try completing the rebalance " +
"by calling poll() and then retry commit again"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ public void testHeartbeatRebalanceInProgressResponseDuringRebalancing() throws I

mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));

coordinator.requestRejoin();
coordinator.requestRejoin("test");

TestUtils.waitForCondition(() -> {
coordinator.ensureActiveGroup(new MockTime(1L).timer(100L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public WorkerCoordinator(GroupRebalanceConfig config,
}

@Override
public void requestRejoin() {
public void requestRejoin(final String reason) {
log.debug("Request joining group due to: {}", reason);
rejoinRequested = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public String memberId() {
}

public void requestRejoin() {
coordinator.requestRejoin();
coordinator.requestRejoin("connect worker requested rejoin");
}

public void maybeLeaveGroup(String leaveReason) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ public void testRejoinGroup() {
assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.assignment.tasks());

// and join the group again
coordinator.requestRejoin();
coordinator.requestRejoin("test");
client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
Collections.emptyList(), Errors.NONE));
Expand Down