From b17fc788a5220ebdb3a209ae300623171cd44131 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sun, 28 Feb 2021 22:41:35 -0800 Subject: [PATCH 1/5] make sure all rejoin groupa and reset state has a reason --- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../internals/AbstractCoordinator.java | 45 ++++++++++--------- .../internals/ConsumerCoordinator.java | 28 ++++++------ .../internals/AbstractCoordinatorTest.java | 2 +- 4 files changed, 40 insertions(+), 37 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index f7760bbe890a4..6b3bbf00bc001 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -2263,7 +2263,7 @@ public void enforceRebalance() { if (coordinator == null) { throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group."); } - coordinator.requestRejoin(); + coordinator.requestRejoin("rebalance enforced by user"); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 34f81db306d33..7b3591c3394cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -455,10 +455,11 @@ boolean joinGroupIfNeeded(final Timer timer) { resetJoinGroupFuture(); needsJoinPrepare = true; } else { - log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " + - "the rebalance callback is triggered, marking this rebalance as failed and retry", - generationSnapshot, stateSnapshot); - resetStateAndRejoin(); + final String reason = String.format("rebalance failed since the generation/state was " + + "modified by heartbeat thread to %s/%s before the rebalance callback triggered", + generationSnapshot, stateSnapshot); + + resetStateAndRejoin(reason); resetJoinGroupFuture(); } } else { @@ -479,7 +480,7 @@ boolean joinGroupIfNeeded(final Timer timer) { else if (!future.isRetriable()) throw exception; - resetStateAndRejoin(); + resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); timer.sleep(rebalanceConfig.retryBackoffMs); } } @@ -775,8 +776,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { - requestRejoin(); - if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { @@ -802,8 +801,10 @@ public void handle(SyncGroupResponse syncResponse, log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}", error.message(), sentGeneration); markCoordinatorUnknown(error); + requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error); future.raise(error); } else { + requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error); future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message())); } } @@ -860,7 +861,7 @@ public void onSuccess(ClientResponse resp, RequestFuture future) { @Override public void onFailure(RuntimeException e, RequestFuture future) { - log.debug("FindCoordinator request failed due to {}", e); + log.debug("FindCoordinator request failed due to {}", e.toString()); if (!(e instanceof RetriableException)) { // Remember the exception if fatal so we can ensure it gets thrown by the main thread @@ -961,29 +962,34 @@ protected synchronized String memberId() { return generation.memberId; } - private synchronized void resetState() { + private synchronized void resetState(final String reason) { + log.info("Resetting generation due to: {}", reason); + state = MemberState.UNJOINED; generation = Generation.NO_GENERATION; } - private synchronized void resetStateAndRejoin() { - resetState(); - rejoinNeeded = true; + private synchronized void resetStateAndRejoin(final String reason) { + resetState(reason); + requestRejoin(reason); } synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) { - log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api); - - resetStateAndRejoin(); + final String reason = String.format("encountered %s from %s response", error, api); + resetStateAndRejoin(reason); } synchronized void resetGenerationOnLeaveGroup() { - log.debug("Resetting generation due to consumer pro-actively leaving the group"); + resetStateAndRejoin("consumer pro-actively leaving the group"); + } - resetStateAndRejoin(); + synchronized void requestRejoinOnResponseError(ApiKeys api, Errors error) { + final String reason = String.format("encountered %s from %s response", error, api); + requestRejoin(reason); } - public synchronized void requestRejoin() { + public synchronized void requestRejoin(final String reason) { + log.info("Request joining group due to: {}", reason); this.rejoinNeeded = true; } @@ -1120,8 +1126,7 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture futu // since we may be sending the request during rebalance, we should check // this case and ignore the REBALANCE_IN_PROGRESS error if (state == MemberState.STABLE) { - log.info("Attempt to heartbeat failed since group is rebalancing"); - requestRejoin(); + requestRejoin("group is already rebalancing"); future.raise(error); } else { log.debug("Ignoring heartbeat response with error {} during {} state", error, state); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b415b229fda7c..226297a4d11e8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -373,11 +373,10 @@ protected void onJoinComplete(int generation, Set assignedPartitions = new HashSet<>(assignment.partitions()); if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) { - log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " + - "that the subscription has changed since we joined the group. Will try re-join the group with current subscription", - assignment.partitions(), subscriptions.prettyString()); - - requestRejoin(); + final String reason = String.format("received assignment %s does not match the current subscription %s; " + + "it is likely that the subscription has changed since we joined the group, will re-join with current subscription", + assignment.partitions(), subscriptions.prettyString()); + requestRejoin(reason); return; } @@ -408,8 +407,9 @@ protected void onJoinComplete(int generation, firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions)); // If revoked any partitions, need to re-join the group afterwards - log.info("Need to revoke partitions {} and re-join the group", revokedPartitions); - requestRejoin(); + final String reason = String.format("need to revoke partitions %s as indicated " + + "by the current assignment and re-join", revokedPartitions); + requestRejoin(reason); } } @@ -769,19 +769,17 @@ public boolean rejoinNeededOrPending() { // we need to rejoin if we performed the assignment and metadata has changed; // also for those owned-but-no-longer-existed partitions we should drop them as lost if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) { - log.info("Requesting to re-join the group and trigger rebalance since the assignment metadata has changed from {} to {}", - assignmentSnapshot, metadataSnapshot); - - requestRejoin(); + final String reason = String.format("cached metadata has changed from %s at the beginning of the rebalance to %s", + assignmentSnapshot, metadataSnapshot); + requestRejoin(reason); return true; } // we need to join if our subscription has changed since the last join if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) { - log.info("Requesting to re-join the group and trigger rebalance since the subscription has changed from {} to {}", + final String reason = String.format("subscription has changed from %s at the beginning of the rebalance to %s", joinedSubscription, subscriptions.subscription()); - - requestRejoin(); + requestRejoin(reason); return true; } @@ -1236,7 +1234,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu * request re-join but do not reset generations. If the callers decide to retry they * can go ahead and call poll to finish up the rebalance first, and then try commit again. */ - requestRejoin(); + requestRejoin("offset commit failed since group is already rebalancing"); future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " + "consumer group is executing a rebalance at the moment. You can try completing the rebalance " + "by calling poll() and then retry commit again")); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index c88cf742d0d26..c0606f147ece5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -737,7 +737,7 @@ public void testHeartbeatRebalanceInProgressResponseDuringRebalancing() throws I mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS)); - coordinator.requestRejoin(); + coordinator.requestRejoin("test"); TestUtils.waitForCondition(() -> { coordinator.ensureActiveGroup(new MockTime(1L).timer(100L)); From c36c6a71ebdebe4c2600d7009315f32338602e38 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 5 Mar 2021 23:48:33 -0800 Subject: [PATCH 2/5] github comments --- .../internals/AbstractCoordinator.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 4624f57dbb4ee..df51eb0c6f5b2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; @@ -465,22 +466,19 @@ boolean joinGroupIfNeeded(final Timer timer) { } else { final RuntimeException exception = future.exception(); - // we do not need to log error for memberId required, - // since it is not really an error and is transient - if (!(exception instanceof MemberIdRequiredException)) { - log.info("Rebalance failed.", exception); - } - resetJoinGroupFuture(); + if (exception instanceof UnknownMemberIdException || - exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException || + exception instanceof RebalanceInProgressException || exception instanceof MemberIdRequiredException) continue; else if (!future.isRetriable()) throw exception; - resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); + if (!(exception instanceof CoordinatorLoadInProgressException)) + resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); + timer.sleep(rebalanceConfig.retryBackoffMs); } } @@ -656,6 +654,8 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut synchronized (AbstractCoordinator.this) { AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null); } + requestRejoin("need to re-join with the given member-id"); + future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("JoinGroup failed due to non-fatal error: REBALANCE_IN_PROGRESS, " + @@ -801,10 +801,8 @@ public void handle(SyncGroupResponse syncResponse, log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}", error.message(), sentGeneration); markCoordinatorUnknown(error); - requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error); future.raise(error); } else { - requestRejoinOnResponseError(ApiKeys.SYNC_GROUP, error); future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message())); } } @@ -962,7 +960,7 @@ protected synchronized String memberId() { return generation.memberId; } - private synchronized void resetState(final String reason) { + private synchronized void resetStateAndGeneration(final String reason) { log.info("Resetting generation due to: {}", reason); state = MemberState.UNJOINED; @@ -970,7 +968,7 @@ private synchronized void resetState(final String reason) { } private synchronized void resetStateAndRejoin(final String reason) { - resetState(reason); + resetStateAndGeneration(reason); requestRejoin(reason); } From ac15c596778c5d09961a513a78910a66493b5589 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 6 Mar 2021 08:04:29 -0800 Subject: [PATCH 3/5] incorporate connect changes --- .../kafka/clients/consumer/internals/AbstractCoordinator.java | 4 +--- .../kafka/connect/runtime/distributed/WorkerCoordinator.java | 3 ++- .../kafka/connect/runtime/distributed/WorkerGroupMember.java | 2 +- .../connect/runtime/distributed/WorkerCoordinatorTest.java | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index df51eb0c6f5b2..8f6d68a021bd1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -476,9 +476,7 @@ boolean joinGroupIfNeeded(final Timer timer) { else if (!future.isRetriable()) throw exception; - if (!(exception instanceof CoordinatorLoadInProgressException)) - resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); - + resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); timer.sleep(rebalanceConfig.retryBackoffMs); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 110ef00a01b2c..425213fd94b78 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -101,7 +101,8 @@ public WorkerCoordinator(GroupRebalanceConfig config, } @Override - public void requestRejoin() { + public void requestRejoin(final String reason) { + log.debug("Request joining group due to: {}", reason); rejoinRequested = true; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 2a9a9f2a2bac6..64576e8915473 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -200,7 +200,7 @@ public String memberId() { } public void requestRejoin() { - coordinator.requestRejoin(); + coordinator.requestRejoin("connect worker requested rejoin"); } public void maybeLeaveGroup(String leaveReason) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 9178dd852ee7d..a407b4d4e65aa 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -364,7 +364,7 @@ public void testRejoinGroup() { assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.assignment.tasks()); // and join the group again - coordinator.requestRejoin(); + coordinator.requestRejoin("test"); client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1), Collections.emptyList(), Errors.NONE)); From 2bdb62cce7ebfb0a39db27e9bcf3abdbf8da633f Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 12 Mar 2021 14:26:57 -0800 Subject: [PATCH 4/5] remove unused imports --- .../kafka/clients/consumer/internals/AbstractCoordinator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 8f6d68a021bd1..6d033ebbd5883 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; -import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.FencedInstanceIdException; import org.apache.kafka.common.errors.GroupAuthorizationException; From c8cb7ab8291f610289f56e5497d8e1958afc63d5 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 12 Mar 2021 15:24:01 -0800 Subject: [PATCH 5/5] remove unused function --- .../clients/consumer/internals/AbstractCoordinator.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 6d033ebbd5883..82daa7bb1b37d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -978,11 +978,6 @@ synchronized void resetGenerationOnLeaveGroup() { resetStateAndRejoin("consumer pro-actively leaving the group"); } - synchronized void requestRejoinOnResponseError(ApiKeys api, Errors error) { - final String reason = String.format("encountered %s from %s response", error, api); - requestRejoin(reason); - } - public synchronized void requestRejoin(final String reason) { log.info("Request joining group due to: {}", reason); this.rejoinNeeded = true;