diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 082d0eecd120d..fac176c99fe4b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -2289,7 +2289,7 @@ public void enforceRebalance() { if (coordinator == null) { throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group."); } - coordinator.requestRejoin(); + coordinator.requestRejoin("rebalance enforced by user"); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index ab3c33beb876e..82daa7bb1b37d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -455,31 +455,27 @@ boolean joinGroupIfNeeded(final Timer timer) { resetJoinGroupFuture(); needsJoinPrepare = true; } else { - log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " + - "the rebalance callback is triggered, marking this rebalance as failed and retry", - generationSnapshot, stateSnapshot); - resetStateAndRejoin(); + final String reason = String.format("rebalance failed since the generation/state was " + + "modified by heartbeat thread to %s/%s before the rebalance callback triggered", + generationSnapshot, stateSnapshot); + + resetStateAndRejoin(reason); resetJoinGroupFuture(); } } else { final RuntimeException exception = future.exception(); - // we do not need to log error for memberId required, - // since it is not really an error and is transient - if (!(exception instanceof MemberIdRequiredException)) { - log.info("Rebalance failed.", exception); - } - resetJoinGroupFuture(); + if (exception instanceof UnknownMemberIdException || - exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException || + exception instanceof RebalanceInProgressException || exception instanceof MemberIdRequiredException) continue; else if (!future.isRetriable()) throw exception; - resetStateAndRejoin(); + resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); timer.sleep(rebalanceConfig.retryBackoffMs); } } @@ -655,6 +651,8 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut synchronized (AbstractCoordinator.this) { AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null); } + requestRejoin("need to re-join with the given member-id"); + future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.info("JoinGroup failed due to non-fatal error: REBALANCE_IN_PROGRESS, " + @@ -775,8 +773,6 @@ public void handle(SyncGroupResponse syncResponse, } } } else { - requestRejoin(); - if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { @@ -860,7 +856,7 @@ public void onSuccess(ClientResponse resp, RequestFuture future) { @Override public void onFailure(RuntimeException e, RequestFuture future) { - log.debug("FindCoordinator request failed due to {}", e); + log.debug("FindCoordinator request failed due to {}", e.toString()); if (!(e instanceof RetriableException)) { // Remember the exception if fatal so we can ensure it gets thrown by the main thread @@ -961,29 +957,29 @@ protected synchronized String memberId() { return generation.memberId; } - private synchronized void resetState() { + private synchronized void resetStateAndGeneration(final String reason) { + log.info("Resetting generation due to: {}", reason); + state = MemberState.UNJOINED; generation = Generation.NO_GENERATION; } - private synchronized void resetStateAndRejoin() { - resetState(); - rejoinNeeded = true; + private synchronized void resetStateAndRejoin(final String reason) { + resetStateAndGeneration(reason); + requestRejoin(reason); } synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) { - log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api); - - resetStateAndRejoin(); + final String reason = String.format("encountered %s from %s response", error, api); + resetStateAndRejoin(reason); } synchronized void resetGenerationOnLeaveGroup() { - log.debug("Resetting generation due to consumer pro-actively leaving the group"); - - resetStateAndRejoin(); + resetStateAndRejoin("consumer pro-actively leaving the group"); } - public synchronized void requestRejoin() { + public synchronized void requestRejoin(final String reason) { + log.info("Request joining group due to: {}", reason); this.rejoinNeeded = true; } @@ -1120,8 +1116,7 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture futu // since we may be sending the request during rebalance, we should check // this case and ignore the REBALANCE_IN_PROGRESS error if (state == MemberState.STABLE) { - log.info("Attempt to heartbeat failed since group is rebalancing"); - requestRejoin(); + requestRejoin("group is already rebalancing"); future.raise(error); } else { log.debug("Ignoring heartbeat response with error {} during {} state", error, state); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b415b229fda7c..226297a4d11e8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -373,11 +373,10 @@ protected void onJoinComplete(int generation, Set assignedPartitions = new HashSet<>(assignment.partitions()); if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) { - log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " + - "that the subscription has changed since we joined the group. Will try re-join the group with current subscription", - assignment.partitions(), subscriptions.prettyString()); - - requestRejoin(); + final String reason = String.format("received assignment %s does not match the current subscription %s; " + + "it is likely that the subscription has changed since we joined the group, will re-join with current subscription", + assignment.partitions(), subscriptions.prettyString()); + requestRejoin(reason); return; } @@ -408,8 +407,9 @@ protected void onJoinComplete(int generation, firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions)); // If revoked any partitions, need to re-join the group afterwards - log.info("Need to revoke partitions {} and re-join the group", revokedPartitions); - requestRejoin(); + final String reason = String.format("need to revoke partitions %s as indicated " + + "by the current assignment and re-join", revokedPartitions); + requestRejoin(reason); } } @@ -769,19 +769,17 @@ public boolean rejoinNeededOrPending() { // we need to rejoin if we performed the assignment and metadata has changed; // also for those owned-but-no-longer-existed partitions we should drop them as lost if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) { - log.info("Requesting to re-join the group and trigger rebalance since the assignment metadata has changed from {} to {}", - assignmentSnapshot, metadataSnapshot); - - requestRejoin(); + final String reason = String.format("cached metadata has changed from %s at the beginning of the rebalance to %s", + assignmentSnapshot, metadataSnapshot); + requestRejoin(reason); return true; } // we need to join if our subscription has changed since the last join if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) { - log.info("Requesting to re-join the group and trigger rebalance since the subscription has changed from {} to {}", + final String reason = String.format("subscription has changed from %s at the beginning of the rebalance to %s", joinedSubscription, subscriptions.subscription()); - - requestRejoin(); + requestRejoin(reason); return true; } @@ -1236,7 +1234,7 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu * request re-join but do not reset generations. If the callers decide to retry they * can go ahead and call poll to finish up the rebalance first, and then try commit again. */ - requestRejoin(); + requestRejoin("offset commit failed since group is already rebalancing"); future.raise(new RebalanceInProgressException("Offset commit cannot be completed since the " + "consumer group is executing a rebalance at the moment. You can try completing the rebalance " + "by calling poll() and then retry commit again")); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index c88cf742d0d26..c0606f147ece5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -737,7 +737,7 @@ public void testHeartbeatRebalanceInProgressResponseDuringRebalancing() throws I mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS)); - coordinator.requestRejoin(); + coordinator.requestRejoin("test"); TestUtils.waitForCondition(() -> { coordinator.ensureActiveGroup(new MockTime(1L).timer(100L)); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 110ef00a01b2c..425213fd94b78 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -101,7 +101,8 @@ public WorkerCoordinator(GroupRebalanceConfig config, } @Override - public void requestRejoin() { + public void requestRejoin(final String reason) { + log.debug("Request joining group due to: {}", reason); rejoinRequested = true; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 2a9a9f2a2bac6..64576e8915473 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -200,7 +200,7 @@ public String memberId() { } public void requestRejoin() { - coordinator.requestRejoin(); + coordinator.requestRejoin("connect worker requested rejoin"); } public void maybeLeaveGroup(String leaveReason) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 9178dd852ee7d..a407b4d4e65aa 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -364,7 +364,7 @@ public void testRejoinGroup() { assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.assignment.tasks()); // and join the group again - coordinator.requestRejoin(); + coordinator.requestRejoin("test"); client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1), Collections.emptyList(), Errors.NONE));