From 4169106cdb903a68788d74b55abccd25af9775d1 Mon Sep 17 00:00:00 2001 From: rivensun Date: Sun, 19 Sep 2021 17:59:25 +0800 Subject: [PATCH 01/21] fix issue : KafkaConsumer cannot jump out of the poll method, and the cpu and traffic on the broker side increase sharply JIRA link : https://issues.apache.org/jira/browse/KAFKA-13310 Author: RivenSun2 Reviewers: Luke Chen --- .../internals/AbstractCoordinator.java | 15 +++--- .../internals/ConsumerCoordinator.java | 51 ++++++++++++++----- .../internals/AbstractCoordinatorTest.java | 12 ++--- .../internals/ConsumerCoordinatorTest.java | 34 ++++++------- .../distributed/WorkerCoordinator.java | 2 +- 5 files changed, 72 insertions(+), 42 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 211886153994c..7c85d2e12cd70 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -188,8 +188,9 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, * cleanup from the previous generation (such as committing offsets for the consumer) * @param generation The previous generation or -1 if there was none * @param memberId The identifier of this member in the previous group or "" if there was none + * @param pollTimer A Timer constructed by the poll() timeout time set by the customer */ - protected abstract void onJoinPrepare(int generation, String memberId); + protected abstract void onJoinPrepare(int generation, String memberId, final Timer pollTimer); /** * Perform assignment for the group. This is used by the leader to push state to all the members @@ -343,7 +344,7 @@ protected synchronized long timeToNextHeartbeat(long now) { * Ensure that the group is active (i.e. joined and synced) */ public void ensureActiveGroup() { - while (!ensureActiveGroup(time.timer(Long.MAX_VALUE))) { + while (!ensureActiveGroup(time.timer(Long.MAX_VALUE), null)) { log.warn("still waiting to ensure active group"); } } @@ -352,10 +353,11 @@ public void ensureActiveGroup() { * Ensure the group is active (i.e., joined and synced) * * @param timer Timer bounding how long this method can block + * @param pollTimer A Timer constructed by the poll() timeout time set by the customer * @throws KafkaException if the callback throws exception * @return true iff the group is active */ - boolean ensureActiveGroup(final Timer timer) { + boolean ensureActiveGroup(final Timer timer, final Timer pollTimer) { // always ensure that the coordinator is ready because we may have been disconnected // when sending heartbeats and does not necessarily require us to rejoin the group. if (!ensureCoordinatorReady(timer)) { @@ -363,7 +365,7 @@ boolean ensureActiveGroup(final Timer timer) { } startHeartbeatThreadIfNeeded(); - return joinGroupIfNeeded(timer); + return joinGroupIfNeeded(timer, pollTimer); } private synchronized void startHeartbeatThreadIfNeeded() { @@ -402,10 +404,11 @@ private void closeHeartbeatThread() { * Visible for testing. * * @param timer Timer bounding how long this method can block + * @param pollTimer A Timer constructed by the poll() timeout time set by the customer * @throws KafkaException if the callback throws exception * @return true iff the operation succeeded */ - boolean joinGroupIfNeeded(final Timer timer) { + boolean joinGroupIfNeeded(final Timer timer, final Timer pollTimer) { while (rejoinNeededOrPending()) { if (!ensureCoordinatorReady(timer)) { return false; @@ -420,7 +423,7 @@ boolean joinGroupIfNeeded(final Timer timer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; - onJoinPrepare(generation.generationId, generation.memberId); + onJoinPrepare(generation.generationId, generation.memberId, pollTimer); } final RequestFuture future = initiateJoinGroup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index ed0b793e10d6c..8b2c5effde818 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -66,15 +66,7 @@ import org.slf4j.Logger; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -505,7 +497,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { } // if not wait for join group, we would just use a timer of 0 - if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) { + if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L), timer)) { // since we may use a different timer in the callee, we'd still need // to update the original timer's current time after the call timer.update(time.milliseconds()); @@ -670,10 +662,11 @@ private void validateCooperativeAssignment(final Map allConsumedOffsets = subscriptions.allConsumed(); + + cleanUpConsumedOffsets(allConsumedOffsets); + try { log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets); if (!commitOffsetsSync(allConsumedOffsets, timer)) @@ -1069,6 +1065,37 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } + private void cleanUpConsumedOffsets(Map willCommitOffsets) { + + if (willCommitOffsets.isEmpty()) + return; + + Set subscription = subscriptions.subscription(); + Set toGiveUpTopicPartitions = new HashSet<>(); + + Iterator> iterator = willCommitOffsets.entrySet().iterator(); + + while (iterator.hasNext()) { + + Map.Entry entry = iterator.next(); + + if (!subscription.contains(entry.getKey().topic())) { + + toGiveUpTopicPartitions.add(entry.getKey()); + iterator.remove(); + } + + } + + if (toGiveUpTopicPartitions.size() > 0) { + + //Because toGiveUpTopicPartitions may receive `UnknownTopicOrPartitionException` when submitting their offsets. + //We are prepared to abandon them. The worst effect is that these partitions may repeatedly consume some messages + log.warn("Synchronous auto-commit of offsets {} will be abandoned", toGiveUpTopicPartitions); + + } + } + private class DefaultOffsetCommitCallback implements OffsetCommitCallback { @Override public void onComplete(Map offsets, Exception exception) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 384ba911284d2..89705da4b1eb6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -278,7 +278,7 @@ public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(1); try { Timer firstAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS); - Future firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer)); + Future firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer, null)); mockTime.sleep(REQUEST_TIMEOUT_MS); assertFalse(firstAttempt.get()); @@ -288,7 +288,7 @@ public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); Timer secondAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS); - Future secondAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(secondAttemptTimer)); + Future secondAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(secondAttemptTimer, null)); assertTrue(secondAttempt.get()); } finally { @@ -614,7 +614,7 @@ public void testNoGenerationWillNotTriggerProtocolNameCheck() { }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, PROTOCOL_NAME)); // No exception shall be thrown as the generation is reset. - coordinator.joinGroupIfNeeded(mockTime.timer(100L)); + coordinator.joinGroupIfNeeded(mockTime.timer(100L), null); } private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType, @@ -643,7 +643,7 @@ private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtoco && syncGroupRequest.data().protocolName().equals(PROTOCOL_NAME); }, syncGroupResponse(Errors.NONE, syncGroupResponseProtocolType, syncGroupResponseProtocolName)); - return coordinator.joinGroupIfNeeded(mockTime.timer(5000L)); + return coordinator.joinGroupIfNeeded(mockTime.timer(5000L), null); } @Test @@ -874,7 +874,7 @@ public void testHeartbeatRebalanceInProgressResponseDuringRebalancing() throws I coordinator.requestRejoin("test"); TestUtils.waitForCondition(() -> { - coordinator.ensureActiveGroup(new MockTime(1L).timer(100L)); + coordinator.ensureActiveGroup(new MockTime(1L).timer(100L), null); return !coordinator.heartbeat().hasInflight(); }, 2000, @@ -1574,7 +1574,7 @@ protected Map performAssignment(String leaderId, } @Override - protected void onJoinPrepare(int generation, String memberId) { + protected void onJoinPrepare(int generation, String memberId, Timer pollTimer) { onJoinPrepareInvokes++; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 902ea9957e270..45045b54725a2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1121,7 +1121,7 @@ public void testNormalJoinGroupFollower() { sync.groupAssignments().isEmpty(); }, syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(assigned), subscriptions.assignedPartitions()); @@ -1187,7 +1187,7 @@ public void testPatternJoinGroupFollower() { // expect client to force updating the metadata, if yes gives it both topics client.prepareMetadataUpdate(metadataResponse); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(assigned.size(), subscriptions.numAssignedPartitions()); @@ -1261,7 +1261,7 @@ public void testPendingMemberShouldLeaveGroup() { client.prepareResponse(joinGroupFollowerResponse(-1, consumerId, "leader-id", Errors.MEMBER_ID_REQUIRED)); // execute join group - coordinator.joinGroupIfNeeded(time.timer(0)); + coordinator.joinGroupIfNeeded(time.timer(0),null); final AtomicBoolean received = new AtomicBoolean(false); client.prepareResponse(body -> { @@ -1284,7 +1284,7 @@ public void testUnexpectedErrorOnSyncGroup() { // join initially, but let coordinator rebalance on sync client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_SERVER_ERROR)); - assertThrows(KafkaException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE))); + assertThrows(KafkaException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null)); } @Test @@ -1305,7 +1305,7 @@ public void testUnknownMemberIdOnSyncGroup() { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1326,7 +1326,7 @@ public void testRebalanceInProgressOnSyncGroup() { client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1350,7 +1350,7 @@ public void testIllegalGenerationOnSyncGroup() { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1626,7 +1626,7 @@ public void testRejoinGroup() { subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); Collection revoked = getRevoked(assigned, assigned); Collection added = getAdded(assigned, assigned); @@ -1650,7 +1650,7 @@ public void testDisconnectInJoin() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(assigned), subscriptions.assignedPartitions()); @@ -1670,7 +1670,7 @@ public void testInvalidSessionTimeout() { // coordinator doesn't like the session timeout client.prepareResponse(joinGroupFollowerResponse(0, consumerId, "", Errors.INVALID_SESSION_TIMEOUT)); - assertThrows(ApiException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE))); + assertThrows(ApiException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null)); } @Test @@ -1817,7 +1817,7 @@ public void testAutoCommitDynamicAssignmentRebalance() { client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); subscriptions.seek(t1p, 100); @@ -2285,7 +2285,7 @@ public void testCommitOffsetRebalanceInProgress() { Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); - coordinator.ensureActiveGroup(time.timer(0L)); + coordinator.ensureActiveGroup(time.timer(0L),null); assertTrue(coordinator.rejoinNeededOrPending()); assertNull(coordinator.generationIfStable()); @@ -2801,7 +2801,7 @@ public void testConsumerRejoinAfterRebalance() { MockTime time = new MockTime(1); // onJoinPrepare will be executed and onJoinComplete will not. - boolean res = coordinator.joinGroupIfNeeded(time.timer(2)); + boolean res = coordinator.joinGroupIfNeeded(time.timer(2),null); assertFalse(res); assertFalse(client.hasPendingResponses()); @@ -2818,7 +2818,7 @@ public void testConsumerRejoinAfterRebalance() { client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE)); // Join future should succeed but generation already cleared so result of join is false. - res = coordinator.joinGroupIfNeeded(time.timer(1)); + res = coordinator.joinGroupIfNeeded(time.timer(1),null); assertFalse(res); @@ -2832,7 +2832,7 @@ public void testConsumerRejoinAfterRebalance() { client.respond(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - res = coordinator.joinGroupIfNeeded(time.timer(3000)); + res = coordinator.joinGroupIfNeeded(time.timer(3000),null); assertTrue(res); assertFalse(client.hasPendingResponses()); @@ -2914,7 +2914,7 @@ private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGrou subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); } else { subscriptions.assignFromUser(singleton(t1p)); } @@ -3136,7 +3136,7 @@ private void joinAsFollowerAndReceiveAssignment(ConsumerCoordinator coordinator, coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assignment, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); } private void prepareOffsetCommitRequest(Map expectedOffsets, Errors error) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 425213fd94b78..36a2ef90446db 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -218,7 +218,7 @@ protected Map performAssignment(String leaderId, String prot } @Override - protected void onJoinPrepare(int generation, String memberId) { + protected void onJoinPrepare(int generation, String memberId, final Timer pollTimer) { log.info("Rebalance started"); leaderState(null); final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot; From cd84dbc6e20378265aa4f4d655c2df288c7ed53a Mon Sep 17 00:00:00 2001 From: rivensun Date: Mon, 20 Sep 2021 19:45:14 +0800 Subject: [PATCH 02/21] fix this issue for AUTO_PATTERN and AUTO_TOPICS in cleanUpConsumedOffsets method JIRA link : https://issues.apache.org/jira/browse/KAFKA-13310 Author: RivenSun2 Reviewers: Luke Chen --- .../kafka/clients/consumer/internals/ConsumerCoordinator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 8b2c5effde818..20bf62350aa53 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1070,7 +1070,7 @@ private void cleanUpConsumedOffsets(Map willC if (willCommitOffsets.isEmpty()) return; - Set subscription = subscriptions.subscription(); + Set validTopics = metadata.fetch().topics(); Set toGiveUpTopicPartitions = new HashSet<>(); Iterator> iterator = willCommitOffsets.entrySet().iterator(); @@ -1079,7 +1079,7 @@ private void cleanUpConsumedOffsets(Map willC Map.Entry entry = iterator.next(); - if (!subscription.contains(entry.getKey().topic())) { + if (!validTopics.contains(entry.getKey().topic())) { toGiveUpTopicPartitions.add(entry.getKey()); iterator.remove(); From 39ee67a145cc15949e4dfac592ab1746acd944c0 Mon Sep 17 00:00:00 2001 From: rivensun Date: Wed, 22 Sep 2021 18:37:08 +0800 Subject: [PATCH 03/21] 1. fix NPE issue 2. Optimize the import of package Author: RivenSun2 Reviewers: Luke Chen --- .../consumer/internals/AbstractCoordinator.java | 2 +- .../consumer/internals/ConsumerCoordinator.java | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 7c85d2e12cd70..b092b238e6afc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -423,7 +423,7 @@ boolean joinGroupIfNeeded(final Timer timer, final Timer pollTimer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; - onJoinPrepare(generation.generationId, generation.memberId, pollTimer); + onJoinPrepare(generation.generationId, generation.memberId, pollTimer == null ? time.timer(0L) : pollTimer); } final RequestFuture future = initiateJoinGroup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 20bf62350aa53..8dafad0f128fd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -66,7 +66,16 @@ import org.slf4j.Logger; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; From 9aa7bd7089c5f4bb71e6194a3c2e1321b6eb6d2a Mon Sep 17 00:00:00 2001 From: rivensun Date: Sun, 26 Sep 2021 17:21:12 +0800 Subject: [PATCH 04/21] commit changeCodes by showuon review Author: RivenSun2 Reviewers: Luke Chen --- .../internals/AbstractCoordinator.java | 4 +-- .../internals/ConsumerCoordinator.java | 28 ++++++--------- .../internals/AbstractCoordinatorTest.java | 10 +++--- .../internals/ConsumerCoordinatorTest.java | 34 +++++++++---------- 4 files changed, 34 insertions(+), 42 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index b092b238e6afc..4e35d96cafe87 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -344,7 +344,7 @@ protected synchronized long timeToNextHeartbeat(long now) { * Ensure that the group is active (i.e. joined and synced) */ public void ensureActiveGroup() { - while (!ensureActiveGroup(time.timer(Long.MAX_VALUE), null)) { + while (!ensureActiveGroup(time.timer(Long.MAX_VALUE), time.timer(rebalanceConfig.rebalanceTimeoutMs))) { log.warn("still waiting to ensure active group"); } } @@ -423,7 +423,7 @@ boolean joinGroupIfNeeded(final Timer timer, final Timer pollTimer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; - onJoinPrepare(generation.generationId, generation.memberId, pollTimer == null ? time.timer(0L) : pollTimer); + onJoinPrepare(generation.generationId, generation.memberId, pollTimer); } final RequestFuture future = initiateJoinGroup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 8dafad0f128fd..ee8d47ed61d2f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -33,15 +33,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.FencedInstanceIdException; -import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.apache.kafka.common.errors.InterruptException; -import org.apache.kafka.common.errors.UnstableOffsetCommitException; -import org.apache.kafka.common.errors.RebalanceInProgressException; -import org.apache.kafka.common.errors.RetriableException; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.errors.*; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; @@ -674,7 +666,6 @@ private void validateCooperativeAssignment(final Map offsets, if (future.failed() && !future.isRetriable()) throw future.exception(); + if(future.exception() instanceof UnknownTopicOrPartitionException) + cleanUpConsumedOffsets(offsets); + timer.sleep(rebalanceConfig.retryBackoffMs); } while (timer.notExpired()); @@ -1057,8 +1051,6 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { if (autoCommitEnabled) { Map allConsumedOffsets = subscriptions.allConsumed(); - cleanUpConsumedOffsets(allConsumedOffsets); - try { log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets); if (!commitOffsetsSync(allConsumedOffsets, timer)) @@ -1074,23 +1066,23 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } - private void cleanUpConsumedOffsets(Map willCommitOffsets) { + private void cleanUpConsumedOffsets(Map partitionOffsetsToBeCommitted) { - if (willCommitOffsets.isEmpty()) + if (partitionOffsetsToBeCommitted.isEmpty()) return; Set validTopics = metadata.fetch().topics(); Set toGiveUpTopicPartitions = new HashSet<>(); - Iterator> iterator = willCommitOffsets.entrySet().iterator(); + Iterator iterator = partitionOffsetsToBeCommitted.keySet().iterator(); while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); + TopicPartition topicPartition = iterator.next(); - if (!validTopics.contains(entry.getKey().topic())) { + if (!validTopics.contains(topicPartition.topic())) { - toGiveUpTopicPartitions.add(entry.getKey()); + toGiveUpTopicPartitions.add(topicPartition); iterator.remove(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 89705da4b1eb6..ecf01427f140e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -278,7 +278,7 @@ public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(1); try { Timer firstAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS); - Future firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer, null)); + Future firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer, mockTime.timer(0))); mockTime.sleep(REQUEST_TIMEOUT_MS); assertFalse(firstAttempt.get()); @@ -288,7 +288,7 @@ public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); Timer secondAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS); - Future secondAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(secondAttemptTimer, null)); + Future secondAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(secondAttemptTimer, mockTime.timer(0))); assertTrue(secondAttempt.get()); } finally { @@ -614,7 +614,7 @@ public void testNoGenerationWillNotTriggerProtocolNameCheck() { }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, PROTOCOL_NAME)); // No exception shall be thrown as the generation is reset. - coordinator.joinGroupIfNeeded(mockTime.timer(100L), null); + coordinator.joinGroupIfNeeded(mockTime.timer(100L), mockTime.timer(0)); } private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType, @@ -643,7 +643,7 @@ private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtoco && syncGroupRequest.data().protocolName().equals(PROTOCOL_NAME); }, syncGroupResponse(Errors.NONE, syncGroupResponseProtocolType, syncGroupResponseProtocolName)); - return coordinator.joinGroupIfNeeded(mockTime.timer(5000L), null); + return coordinator.joinGroupIfNeeded(mockTime.timer(5000L), mockTime.timer(0)); } @Test @@ -874,7 +874,7 @@ public void testHeartbeatRebalanceInProgressResponseDuringRebalancing() throws I coordinator.requestRejoin("test"); TestUtils.waitForCondition(() -> { - coordinator.ensureActiveGroup(new MockTime(1L).timer(100L), null); + coordinator.ensureActiveGroup(new MockTime(1L).timer(100L), new MockTime(1L).timer(0L)); return !coordinator.heartbeat().hasInflight(); }, 2000, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 45045b54725a2..edd6882a9901a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1121,7 +1121,7 @@ public void testNormalJoinGroupFollower() { sync.groupAssignments().isEmpty(); }, syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(assigned), subscriptions.assignedPartitions()); @@ -1187,7 +1187,7 @@ public void testPatternJoinGroupFollower() { // expect client to force updating the metadata, if yes gives it both topics client.prepareMetadataUpdate(metadataResponse); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(assigned.size(), subscriptions.numAssignedPartitions()); @@ -1261,7 +1261,7 @@ public void testPendingMemberShouldLeaveGroup() { client.prepareResponse(joinGroupFollowerResponse(-1, consumerId, "leader-id", Errors.MEMBER_ID_REQUIRED)); // execute join group - coordinator.joinGroupIfNeeded(time.timer(0),null); + coordinator.joinGroupIfNeeded(time.timer(0), time.timer(0L)); final AtomicBoolean received = new AtomicBoolean(false); client.prepareResponse(body -> { @@ -1284,7 +1284,7 @@ public void testUnexpectedErrorOnSyncGroup() { // join initially, but let coordinator rebalance on sync client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_SERVER_ERROR)); - assertThrows(KafkaException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null)); + assertThrows(KafkaException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L))); } @Test @@ -1305,7 +1305,7 @@ public void testUnknownMemberIdOnSyncGroup() { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1326,7 +1326,7 @@ public void testRebalanceInProgressOnSyncGroup() { client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1350,7 +1350,7 @@ public void testIllegalGenerationOnSyncGroup() { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1626,7 +1626,7 @@ public void testRejoinGroup() { subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); Collection revoked = getRevoked(assigned, assigned); Collection added = getAdded(assigned, assigned); @@ -1650,7 +1650,7 @@ public void testDisconnectInJoin() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(assigned), subscriptions.assignedPartitions()); @@ -1670,7 +1670,7 @@ public void testInvalidSessionTimeout() { // coordinator doesn't like the session timeout client.prepareResponse(joinGroupFollowerResponse(0, consumerId, "", Errors.INVALID_SESSION_TIMEOUT)); - assertThrows(ApiException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null)); + assertThrows(ApiException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L))); } @Test @@ -1817,7 +1817,7 @@ public void testAutoCommitDynamicAssignmentRebalance() { client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); subscriptions.seek(t1p, 100); @@ -2285,7 +2285,7 @@ public void testCommitOffsetRebalanceInProgress() { Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); - coordinator.ensureActiveGroup(time.timer(0L),null); + coordinator.ensureActiveGroup(time.timer(0L), time.timer(0L)); assertTrue(coordinator.rejoinNeededOrPending()); assertNull(coordinator.generationIfStable()); @@ -2801,7 +2801,7 @@ public void testConsumerRejoinAfterRebalance() { MockTime time = new MockTime(1); // onJoinPrepare will be executed and onJoinComplete will not. - boolean res = coordinator.joinGroupIfNeeded(time.timer(2),null); + boolean res = coordinator.joinGroupIfNeeded(time.timer(2), time.timer(0L)); assertFalse(res); assertFalse(client.hasPendingResponses()); @@ -2818,7 +2818,7 @@ public void testConsumerRejoinAfterRebalance() { client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE)); // Join future should succeed but generation already cleared so result of join is false. - res = coordinator.joinGroupIfNeeded(time.timer(1),null); + res = coordinator.joinGroupIfNeeded(time.timer(1), time.timer(0L)); assertFalse(res); @@ -2832,7 +2832,7 @@ public void testConsumerRejoinAfterRebalance() { client.respond(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - res = coordinator.joinGroupIfNeeded(time.timer(3000),null); + res = coordinator.joinGroupIfNeeded(time.timer(3000), time.timer(0L)); assertTrue(res); assertFalse(client.hasPendingResponses()); @@ -2914,7 +2914,7 @@ private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGrou subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); } else { subscriptions.assignFromUser(singleton(t1p)); } @@ -3136,7 +3136,7 @@ private void joinAsFollowerAndReceiveAssignment(ConsumerCoordinator coordinator, coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assignment, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE),null); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); } private void prepareOffsetCommitRequest(Map expectedOffsets, Errors error) { From 5e09d0336b624411a8260763aa83d74c38f897f3 Mon Sep 17 00:00:00 2001 From: rivensun Date: Sun, 26 Sep 2021 20:15:41 +0800 Subject: [PATCH 05/21] fix package auto-import issue by IDEA Editor Author: RivenSun2 Reviewers: Luke Chen --- .../consumer/internals/ConsumerCoordinator.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index ee8d47ed61d2f..ee1a7a5945786 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -33,7 +33,16 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.*; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.UnstableOffsetCommitException; +import org.apache.kafka.common.errors.RebalanceInProgressException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; From 8b1382ac50aefa52c41c94b9ff1fb777c815cf9c Mon Sep 17 00:00:00 2001 From: rivensun Date: Fri, 1 Oct 2021 10:28:53 +0800 Subject: [PATCH 06/21] commit reviewed codeChange Author: RivenSun2 Reviewers: Luke Chen --- .../internals/AbstractCoordinator.java | 16 ++++---- .../internals/ConsumerCoordinator.java | 37 ++++++++++++------- .../internals/AbstractCoordinatorTest.java | 12 +++--- .../internals/ConsumerCoordinatorTest.java | 34 ++++++++--------- .../distributed/WorkerCoordinator.java | 2 +- 5 files changed, 55 insertions(+), 46 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 4e35d96cafe87..6107b1cfc6604 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -188,9 +188,9 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, * cleanup from the previous generation (such as committing offsets for the consumer) * @param generation The previous generation or -1 if there was none * @param memberId The identifier of this member in the previous group or "" if there was none - * @param pollTimer A Timer constructed by the poll() timeout time set by the customer + * @param offsetCommitTimer The timer for committing offsets synchronously */ - protected abstract void onJoinPrepare(int generation, String memberId, final Timer pollTimer); + protected abstract void onJoinPrepare(int generation, String memberId, final Timer offsetCommitTimer); /** * Perform assignment for the group. This is used by the leader to push state to all the members @@ -353,11 +353,11 @@ public void ensureActiveGroup() { * Ensure the group is active (i.e., joined and synced) * * @param timer Timer bounding how long this method can block - * @param pollTimer A Timer constructed by the poll() timeout time set by the customer + * @param offsetCommitTimer The timer for committing offsets synchronously * @throws KafkaException if the callback throws exception * @return true iff the group is active */ - boolean ensureActiveGroup(final Timer timer, final Timer pollTimer) { + boolean ensureActiveGroup(final Timer timer, final Timer offsetCommitTimer) { // always ensure that the coordinator is ready because we may have been disconnected // when sending heartbeats and does not necessarily require us to rejoin the group. if (!ensureCoordinatorReady(timer)) { @@ -365,7 +365,7 @@ boolean ensureActiveGroup(final Timer timer, final Timer pollTimer) { } startHeartbeatThreadIfNeeded(); - return joinGroupIfNeeded(timer, pollTimer); + return joinGroupIfNeeded(timer, offsetCommitTimer); } private synchronized void startHeartbeatThreadIfNeeded() { @@ -404,11 +404,11 @@ private void closeHeartbeatThread() { * Visible for testing. * * @param timer Timer bounding how long this method can block - * @param pollTimer A Timer constructed by the poll() timeout time set by the customer + * @param offsetCommitTimer The timer for committing offsets synchronously * @throws KafkaException if the callback throws exception * @return true iff the operation succeeded */ - boolean joinGroupIfNeeded(final Timer timer, final Timer pollTimer) { + boolean joinGroupIfNeeded(final Timer timer, final Timer offsetCommitTimer) { while (rejoinNeededOrPending()) { if (!ensureCoordinatorReady(timer)) { return false; @@ -423,7 +423,7 @@ boolean joinGroupIfNeeded(final Timer timer, final Timer pollTimer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; - onJoinPrepare(generation.generationId, generation.memberId, pollTimer); + onJoinPrepare(generation.generationId, generation.memberId, offsetCommitTimer); } final RequestFuture future = initiateJoinGroup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index ee1a7a5945786..24b800b72ad5e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -672,10 +672,10 @@ private void validateCooperativeAssignment(final Map offsets, if (offsets.isEmpty()) return true; + boolean shouldCleanUpConsumedOffsets = !checkConsumedOffsetsAreValid(offsets); do { if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { return false; } + if (shouldCleanUpConsumedOffsets) { + cleanUpConsumedOffsets(offsets); + shouldCleanUpConsumedOffsets = false; + } RequestFuture future = sendOffsetCommitRequest(offsets); client.poll(future, timer); @@ -1019,7 +1024,7 @@ public boolean commitOffsetsSync(Map offsets, throw future.exception(); if(future.exception() instanceof UnknownTopicOrPartitionException) - cleanUpConsumedOffsets(offsets); + shouldCleanUpConsumedOffsets = true; timer.sleep(rebalanceConfig.retryBackoffMs); } while (timer.notExpired()); @@ -1059,7 +1064,6 @@ private void doAutoCommitOffsetsAsync() { private void maybeAutoCommitOffsetsSync(Timer timer) { if (autoCommitEnabled) { Map allConsumedOffsets = subscriptions.allConsumed(); - try { log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets); if (!commitOffsetsSync(allConsumedOffsets, timer)) @@ -1075,8 +1079,20 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } - private void cleanUpConsumedOffsets(Map partitionOffsetsToBeCommitted) { + private boolean checkConsumedOffsetsAreValid(Map partitionOffsetsToBeCommitted) { + if (partitionOffsetsToBeCommitted.isEmpty()) + return true; + Set validTopics = metadata.fetch().topics(); + for (TopicPartition topicPartition : partitionOffsetsToBeCommitted.keySet()) { + if (!validTopics.contains(topicPartition.topic())) { + return false; + } + } + return true; + } + + private void cleanUpConsumedOffsets(Map partitionOffsetsToBeCommitted) { if (partitionOffsetsToBeCommitted.isEmpty()) return; @@ -1084,25 +1100,18 @@ private void cleanUpConsumedOffsets(Map parti Set toGiveUpTopicPartitions = new HashSet<>(); Iterator iterator = partitionOffsetsToBeCommitted.keySet().iterator(); - while (iterator.hasNext()) { - TopicPartition topicPartition = iterator.next(); - if (!validTopics.contains(topicPartition.topic())) { - toGiveUpTopicPartitions.add(topicPartition); iterator.remove(); } - } if (toGiveUpTopicPartitions.size() > 0) { - - //Because toGiveUpTopicPartitions may receive `UnknownTopicOrPartitionException` when submitting their offsets. - //We are prepared to abandon them. The worst effect is that these partitions may repeatedly consume some messages + //We might get `UnknownTopicOrPartitionException` after submitting their offsets due to topics been deleted. We should update the offsets list here. + // The worst effect is that we may keep retrying to commit the offsets for the topics not existed any more, before timeout reached. log.warn("Synchronous auto-commit of offsets {} will be abandoned", toGiveUpTopicPartitions); - } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index ecf01427f140e..6ad92517ac7dd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -278,7 +278,7 @@ public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(1); try { Timer firstAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS); - Future firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer, mockTime.timer(0))); + Future firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer, mockTime.timer(10))); mockTime.sleep(REQUEST_TIMEOUT_MS); assertFalse(firstAttempt.get()); @@ -288,7 +288,7 @@ public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); Timer secondAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS); - Future secondAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(secondAttemptTimer, mockTime.timer(0))); + Future secondAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(secondAttemptTimer, mockTime.timer(10))); assertTrue(secondAttempt.get()); } finally { @@ -614,7 +614,7 @@ public void testNoGenerationWillNotTriggerProtocolNameCheck() { }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, PROTOCOL_NAME)); // No exception shall be thrown as the generation is reset. - coordinator.joinGroupIfNeeded(mockTime.timer(100L), mockTime.timer(0)); + coordinator.joinGroupIfNeeded(mockTime.timer(100L), mockTime.timer(10)); } private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType, @@ -643,7 +643,7 @@ private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtoco && syncGroupRequest.data().protocolName().equals(PROTOCOL_NAME); }, syncGroupResponse(Errors.NONE, syncGroupResponseProtocolType, syncGroupResponseProtocolName)); - return coordinator.joinGroupIfNeeded(mockTime.timer(5000L), mockTime.timer(0)); + return coordinator.joinGroupIfNeeded(mockTime.timer(5000L), mockTime.timer(10)); } @Test @@ -874,7 +874,7 @@ public void testHeartbeatRebalanceInProgressResponseDuringRebalancing() throws I coordinator.requestRejoin("test"); TestUtils.waitForCondition(() -> { - coordinator.ensureActiveGroup(new MockTime(1L).timer(100L), new MockTime(1L).timer(0L)); + coordinator.ensureActiveGroup(new MockTime(1L).timer(100L), new MockTime(1L).timer(10L)); return !coordinator.heartbeat().hasInflight(); }, 2000, @@ -1574,7 +1574,7 @@ protected Map performAssignment(String leaderId, } @Override - protected void onJoinPrepare(int generation, String memberId, Timer pollTimer) { + protected void onJoinPrepare(int generation, String memberId, Timer offsetCommitTimer) { onJoinPrepareInvokes++; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index edd6882a9901a..3575cadd25ac8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1121,7 +1121,7 @@ public void testNormalJoinGroupFollower() { sync.groupAssignments().isEmpty(); }, syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(assigned), subscriptions.assignedPartitions()); @@ -1187,7 +1187,7 @@ public void testPatternJoinGroupFollower() { // expect client to force updating the metadata, if yes gives it both topics client.prepareMetadataUpdate(metadataResponse); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(assigned.size(), subscriptions.numAssignedPartitions()); @@ -1261,7 +1261,7 @@ public void testPendingMemberShouldLeaveGroup() { client.prepareResponse(joinGroupFollowerResponse(-1, consumerId, "leader-id", Errors.MEMBER_ID_REQUIRED)); // execute join group - coordinator.joinGroupIfNeeded(time.timer(0), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(0), time.timer(10)); final AtomicBoolean received = new AtomicBoolean(false); client.prepareResponse(body -> { @@ -1284,7 +1284,7 @@ public void testUnexpectedErrorOnSyncGroup() { // join initially, but let coordinator rebalance on sync client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_SERVER_ERROR)); - assertThrows(KafkaException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L))); + assertThrows(KafkaException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10))); } @Test @@ -1305,7 +1305,7 @@ public void testUnknownMemberIdOnSyncGroup() { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1326,7 +1326,7 @@ public void testRebalanceInProgressOnSyncGroup() { client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1350,7 +1350,7 @@ public void testIllegalGenerationOnSyncGroup() { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1626,7 +1626,7 @@ public void testRejoinGroup() { subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); Collection revoked = getRevoked(assigned, assigned); Collection added = getAdded(assigned, assigned); @@ -1650,7 +1650,7 @@ public void testDisconnectInJoin() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(assigned), subscriptions.assignedPartitions()); @@ -1670,7 +1670,7 @@ public void testInvalidSessionTimeout() { // coordinator doesn't like the session timeout client.prepareResponse(joinGroupFollowerResponse(0, consumerId, "", Errors.INVALID_SESSION_TIMEOUT)); - assertThrows(ApiException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L))); + assertThrows(ApiException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10))); } @Test @@ -1817,7 +1817,7 @@ public void testAutoCommitDynamicAssignmentRebalance() { client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); subscriptions.seek(t1p, 100); @@ -2285,7 +2285,7 @@ public void testCommitOffsetRebalanceInProgress() { Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); - coordinator.ensureActiveGroup(time.timer(0L), time.timer(0L)); + coordinator.ensureActiveGroup(time.timer(0L), time.timer(10L)); assertTrue(coordinator.rejoinNeededOrPending()); assertNull(coordinator.generationIfStable()); @@ -2801,7 +2801,7 @@ public void testConsumerRejoinAfterRebalance() { MockTime time = new MockTime(1); // onJoinPrepare will be executed and onJoinComplete will not. - boolean res = coordinator.joinGroupIfNeeded(time.timer(2), time.timer(0L)); + boolean res = coordinator.joinGroupIfNeeded(time.timer(2), time.timer(10)); assertFalse(res); assertFalse(client.hasPendingResponses()); @@ -2818,7 +2818,7 @@ public void testConsumerRejoinAfterRebalance() { client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE)); // Join future should succeed but generation already cleared so result of join is false. - res = coordinator.joinGroupIfNeeded(time.timer(1), time.timer(0L)); + res = coordinator.joinGroupIfNeeded(time.timer(1), time.timer(10)); assertFalse(res); @@ -2832,7 +2832,7 @@ public void testConsumerRejoinAfterRebalance() { client.respond(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - res = coordinator.joinGroupIfNeeded(time.timer(3000), time.timer(0L)); + res = coordinator.joinGroupIfNeeded(time.timer(3000), time.timer(10)); assertTrue(res); assertFalse(client.hasPendingResponses()); @@ -2914,7 +2914,7 @@ private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGrou subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); } else { subscriptions.assignFromUser(singleton(t1p)); } @@ -3136,7 +3136,7 @@ private void joinAsFollowerAndReceiveAssignment(ConsumerCoordinator coordinator, coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assignment, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(0L)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); } private void prepareOffsetCommitRequest(Map expectedOffsets, Errors error) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 36a2ef90446db..373a522c9c2ff 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -218,7 +218,7 @@ protected Map performAssignment(String leaderId, String prot } @Override - protected void onJoinPrepare(int generation, String memberId, final Timer pollTimer) { + protected void onJoinPrepare(int generation, String memberId, final Timer offsetCommitTimer) { log.info("Rebalance started"); leaderState(null); final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot; From 7346e88795627b46eb014aa791806cb7eee2f9ea Mon Sep 17 00:00:00 2001 From: rivensun Date: Sat, 30 Oct 2021 19:47:40 +0800 Subject: [PATCH 07/21] commit reviewed codeChange Author: RivenSun2 Reviewers: Luke Chen --- .../internals/AbstractCoordinator.java | 23 ++-- .../internals/ConsumerCoordinator.java | 104 +++++++++--------- .../internals/AbstractCoordinatorTest.java | 13 ++- .../internals/ConsumerCoordinatorTest.java | 34 +++--- .../distributed/WorkerCoordinator.java | 3 +- 5 files changed, 88 insertions(+), 89 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 6107b1cfc6604..1a5d06ed9c6ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -188,9 +188,9 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, * cleanup from the previous generation (such as committing offsets for the consumer) * @param generation The previous generation or -1 if there was none * @param memberId The identifier of this member in the previous group or "" if there was none - * @param offsetCommitTimer The timer for committing offsets synchronously + * @return true If onJoinPrepare async commit succeeded, false otherwise */ - protected abstract void onJoinPrepare(int generation, String memberId, final Timer offsetCommitTimer); + protected abstract boolean onJoinPrepare(int generation, String memberId); /** * Perform assignment for the group. This is used by the leader to push state to all the members @@ -344,7 +344,7 @@ protected synchronized long timeToNextHeartbeat(long now) { * Ensure that the group is active (i.e. joined and synced) */ public void ensureActiveGroup() { - while (!ensureActiveGroup(time.timer(Long.MAX_VALUE), time.timer(rebalanceConfig.rebalanceTimeoutMs))) { + while (!ensureActiveGroup(time.timer(Long.MAX_VALUE))) { log.warn("still waiting to ensure active group"); } } @@ -353,11 +353,10 @@ public void ensureActiveGroup() { * Ensure the group is active (i.e., joined and synced) * * @param timer Timer bounding how long this method can block - * @param offsetCommitTimer The timer for committing offsets synchronously * @throws KafkaException if the callback throws exception * @return true iff the group is active */ - boolean ensureActiveGroup(final Timer timer, final Timer offsetCommitTimer) { + boolean ensureActiveGroup(final Timer timer) { // always ensure that the coordinator is ready because we may have been disconnected // when sending heartbeats and does not necessarily require us to rejoin the group. if (!ensureCoordinatorReady(timer)) { @@ -365,7 +364,7 @@ boolean ensureActiveGroup(final Timer timer, final Timer offsetCommitTimer) { } startHeartbeatThreadIfNeeded(); - return joinGroupIfNeeded(timer, offsetCommitTimer); + return joinGroupIfNeeded(timer); } private synchronized void startHeartbeatThreadIfNeeded() { @@ -404,11 +403,10 @@ private void closeHeartbeatThread() { * Visible for testing. * * @param timer Timer bounding how long this method can block - * @param offsetCommitTimer The timer for committing offsets synchronously * @throws KafkaException if the callback throws exception - * @return true iff the operation succeeded + * @return true if the operation succeeded */ - boolean joinGroupIfNeeded(final Timer timer, final Timer offsetCommitTimer) { + boolean joinGroupIfNeeded(final Timer timer) { while (rejoinNeededOrPending()) { if (!ensureCoordinatorReady(timer)) { return false; @@ -423,9 +421,14 @@ boolean joinGroupIfNeeded(final Timer timer, final Timer offsetCommitTimer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; - onJoinPrepare(generation.generationId, generation.memberId, offsetCommitTimer); + if (!onJoinPrepare(generation.generationId, generation.memberId)) + needsJoinPrepare = true; } + //should not initiateJoinGroup if needsJoinPrepare still is true + if (needsJoinPrepare) + return false; + final RequestFuture future = initiateJoinGroup(); client.poll(future, timer); if (!future.isDone()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 24b800b72ad5e..3834c37ff550a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -41,7 +41,6 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; @@ -76,7 +75,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -95,6 +93,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final SubscriptionState subscriptions; private final OffsetCommitCallback defaultOffsetCommitCallback; private final boolean autoCommitEnabled; + private RequestFuture onJoinPrepareAsyncCommitFuture = null; private final int autoCommitIntervalMs; private final ConsumerInterceptors interceptors; private final AtomicInteger pendingAsyncCommits; @@ -507,7 +506,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { } // if not wait for join group, we would just use a timer of 0 - if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L), timer)) { + if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) { // since we may use a different timer in the callee, we'd still need // to update the original timer's current time after the call timer.update(time.milliseconds()); @@ -672,10 +671,18 @@ private void validateCooperativeAssignment(final Map offsets, final OffsetCommitCallback callback) { + public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { invokeCompletedOffsetCommitCallbacks(); + RequestFuture future = null; if (!coordinatorUnknown()) { - doCommitOffsetsAsync(offsets, callback); + future = doCommitOffsetsAsync(offsets, callback); } else { // we don't know the current coordinator, so try to find it and then send the commit // or fail (we don't want recursive retries which can cause offset commits to arrive @@ -951,9 +960,10 @@ public void onFailure(RuntimeException e) { // Note that commits are treated as heartbeats by the coordinator, so there is no need to // explicitly allow heartbeats through delayed task execution. client.pollNoWakeup(); + return future; } - private void doCommitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { + private RequestFuture doCommitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { RequestFuture future = sendOffsetCommitRequest(offsets); final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener() { @@ -977,6 +987,7 @@ public void onFailure(RuntimeException e) { } } }); + return future; } /** @@ -996,16 +1007,11 @@ public boolean commitOffsetsSync(Map offsets, if (offsets.isEmpty()) return true; - boolean shouldCleanUpConsumedOffsets = !checkConsumedOffsetsAreValid(offsets); do { if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { return false; } - if (shouldCleanUpConsumedOffsets) { - cleanUpConsumedOffsets(offsets); - shouldCleanUpConsumedOffsets = false; - } RequestFuture future = sendOffsetCommitRequest(offsets); client.poll(future, timer); @@ -1023,9 +1029,6 @@ public boolean commitOffsetsSync(Map offsets, if (future.failed() && !future.isRetriable()) throw future.exception(); - if(future.exception() instanceof UnknownTopicOrPartitionException) - shouldCleanUpConsumedOffsets = true; - timer.sleep(rebalanceConfig.retryBackoffMs); } while (timer.notExpired()); @@ -1042,11 +1045,11 @@ public void maybeAutoCommitOffsetsAsync(long now) { } } - private void doAutoCommitOffsetsAsync() { + private RequestFuture doAutoCommitOffsetsAsync() { Map allConsumedOffsets = subscriptions.allConsumed(); log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets); - commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> { + return commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> { if (exception != null) { if (exception instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets, @@ -1061,6 +1064,33 @@ private void doAutoCommitOffsetsAsync() { }); } + private boolean maybeAutoCommitOffsetsAsync() { + if (autoCommitEnabled) { + invokeCompletedOffsetCommitCallbacks(); + + if (onJoinPrepareAsyncCommitFuture == null) + onJoinPrepareAsyncCommitFuture = doAutoCommitOffsetsAsync(); + if (onJoinPrepareAsyncCommitFuture == null) + return true; + + client.pollNoWakeup(); + invokeCompletedOffsetCommitCallbacks(); + + if (!onJoinPrepareAsyncCommitFuture.isDone()) + return false; + if (onJoinPrepareAsyncCommitFuture.succeeded()) { + onJoinPrepareAsyncCommitFuture = null; + return true; + } + if (onJoinPrepareAsyncCommitFuture.failed() && !onJoinPrepareAsyncCommitFuture.isRetriable()) + throw onJoinPrepareAsyncCommitFuture.exception(); + //retry Sending asynchronous auto-commit + onJoinPrepareAsyncCommitFuture = doAutoCommitOffsetsAsync(); + return false; + } else + return true; + } + private void maybeAutoCommitOffsetsSync(Timer timer) { if (autoCommitEnabled) { Map allConsumedOffsets = subscriptions.allConsumed(); @@ -1079,42 +1109,6 @@ private void maybeAutoCommitOffsetsSync(Timer timer) { } } - private boolean checkConsumedOffsetsAreValid(Map partitionOffsetsToBeCommitted) { - if (partitionOffsetsToBeCommitted.isEmpty()) - return true; - - Set validTopics = metadata.fetch().topics(); - for (TopicPartition topicPartition : partitionOffsetsToBeCommitted.keySet()) { - if (!validTopics.contains(topicPartition.topic())) { - return false; - } - } - return true; - } - - private void cleanUpConsumedOffsets(Map partitionOffsetsToBeCommitted) { - if (partitionOffsetsToBeCommitted.isEmpty()) - return; - - Set validTopics = metadata.fetch().topics(); - Set toGiveUpTopicPartitions = new HashSet<>(); - - Iterator iterator = partitionOffsetsToBeCommitted.keySet().iterator(); - while (iterator.hasNext()) { - TopicPartition topicPartition = iterator.next(); - if (!validTopics.contains(topicPartition.topic())) { - toGiveUpTopicPartitions.add(topicPartition); - iterator.remove(); - } - } - - if (toGiveUpTopicPartitions.size() > 0) { - //We might get `UnknownTopicOrPartitionException` after submitting their offsets due to topics been deleted. We should update the offsets list here. - // The worst effect is that we may keep retrying to commit the offsets for the topics not existed any more, before timeout reached. - log.warn("Synchronous auto-commit of offsets {} will be abandoned", toGiveUpTopicPartitions); - } - } - private class DefaultOffsetCommitCallback implements OffsetCommitCallback { @Override public void onComplete(Map offsets, Exception exception) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 6ad92517ac7dd..5d4e8088bbf33 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -278,7 +278,7 @@ public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { ExecutorService executor = Executors.newFixedThreadPool(1); try { Timer firstAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS); - Future firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer, mockTime.timer(10))); + Future firstAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(firstAttemptTimer)); mockTime.sleep(REQUEST_TIMEOUT_MS); assertFalse(firstAttempt.get()); @@ -288,7 +288,7 @@ public void testTimeoutAndRetryJoinGroupIfNeeded() throws Exception { mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); Timer secondAttemptTimer = mockTime.timer(REQUEST_TIMEOUT_MS); - Future secondAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(secondAttemptTimer, mockTime.timer(10))); + Future secondAttempt = executor.submit(() -> coordinator.joinGroupIfNeeded(secondAttemptTimer)); assertTrue(secondAttempt.get()); } finally { @@ -614,7 +614,7 @@ public void testNoGenerationWillNotTriggerProtocolNameCheck() { }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, PROTOCOL_NAME)); // No exception shall be thrown as the generation is reset. - coordinator.joinGroupIfNeeded(mockTime.timer(100L), mockTime.timer(10)); + coordinator.joinGroupIfNeeded(mockTime.timer(100L)); } private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType, @@ -643,7 +643,7 @@ private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtoco && syncGroupRequest.data().protocolName().equals(PROTOCOL_NAME); }, syncGroupResponse(Errors.NONE, syncGroupResponseProtocolType, syncGroupResponseProtocolName)); - return coordinator.joinGroupIfNeeded(mockTime.timer(5000L), mockTime.timer(10)); + return coordinator.joinGroupIfNeeded(mockTime.timer(5000L)); } @Test @@ -874,7 +874,7 @@ public void testHeartbeatRebalanceInProgressResponseDuringRebalancing() throws I coordinator.requestRejoin("test"); TestUtils.waitForCondition(() -> { - coordinator.ensureActiveGroup(new MockTime(1L).timer(100L), new MockTime(1L).timer(10L)); + coordinator.ensureActiveGroup(new MockTime(1L).timer(100L)); return !coordinator.heartbeat().hasInflight(); }, 2000, @@ -1574,8 +1574,9 @@ protected Map performAssignment(String leaderId, } @Override - protected void onJoinPrepare(int generation, String memberId, Timer offsetCommitTimer) { + protected boolean onJoinPrepare(int generation, String memberId) { onJoinPrepareInvokes++; + return true; } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 3575cadd25ac8..902ea9957e270 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1121,7 +1121,7 @@ public void testNormalJoinGroupFollower() { sync.groupAssignments().isEmpty(); }, syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(assigned), subscriptions.assignedPartitions()); @@ -1187,7 +1187,7 @@ public void testPatternJoinGroupFollower() { // expect client to force updating the metadata, if yes gives it both topics client.prepareMetadataUpdate(metadataResponse); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(assigned.size(), subscriptions.numAssignedPartitions()); @@ -1261,7 +1261,7 @@ public void testPendingMemberShouldLeaveGroup() { client.prepareResponse(joinGroupFollowerResponse(-1, consumerId, "leader-id", Errors.MEMBER_ID_REQUIRED)); // execute join group - coordinator.joinGroupIfNeeded(time.timer(0), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(0)); final AtomicBoolean received = new AtomicBoolean(false); client.prepareResponse(body -> { @@ -1284,7 +1284,7 @@ public void testUnexpectedErrorOnSyncGroup() { // join initially, but let coordinator rebalance on sync client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_SERVER_ERROR)); - assertThrows(KafkaException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10))); + assertThrows(KafkaException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE))); } @Test @@ -1305,7 +1305,7 @@ public void testUnknownMemberIdOnSyncGroup() { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1326,7 +1326,7 @@ public void testRebalanceInProgressOnSyncGroup() { client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1350,7 +1350,7 @@ public void testIllegalGenerationOnSyncGroup() { }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(singleton(t1p), subscriptions.assignedPartitions()); @@ -1626,7 +1626,7 @@ public void testRejoinGroup() { subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); Collection revoked = getRevoked(assigned, assigned); Collection added = getAdded(assigned, assigned); @@ -1650,7 +1650,7 @@ public void testDisconnectInJoin() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assigned, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); assertFalse(coordinator.rejoinNeededOrPending()); assertEquals(toSet(assigned), subscriptions.assignedPartitions()); @@ -1670,7 +1670,7 @@ public void testInvalidSessionTimeout() { // coordinator doesn't like the session timeout client.prepareResponse(joinGroupFollowerResponse(0, consumerId, "", Errors.INVALID_SESSION_TIMEOUT)); - assertThrows(ApiException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10))); + assertThrows(ApiException.class, () -> coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE))); } @Test @@ -1817,7 +1817,7 @@ public void testAutoCommitDynamicAssignmentRebalance() { client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); subscriptions.seek(t1p, 100); @@ -2285,7 +2285,7 @@ public void testCommitOffsetRebalanceInProgress() { Map> memberSubscriptions = singletonMap(consumerId, singletonList(topic1)); partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); - coordinator.ensureActiveGroup(time.timer(0L), time.timer(10L)); + coordinator.ensureActiveGroup(time.timer(0L)); assertTrue(coordinator.rejoinNeededOrPending()); assertNull(coordinator.generationIfStable()); @@ -2801,7 +2801,7 @@ public void testConsumerRejoinAfterRebalance() { MockTime time = new MockTime(1); // onJoinPrepare will be executed and onJoinComplete will not. - boolean res = coordinator.joinGroupIfNeeded(time.timer(2), time.timer(10)); + boolean res = coordinator.joinGroupIfNeeded(time.timer(2)); assertFalse(res); assertFalse(client.hasPendingResponses()); @@ -2818,7 +2818,7 @@ public void testConsumerRejoinAfterRebalance() { client.respond(syncGroupResponse(singletonList(t1p), Errors.NONE)); // Join future should succeed but generation already cleared so result of join is false. - res = coordinator.joinGroupIfNeeded(time.timer(1), time.timer(10)); + res = coordinator.joinGroupIfNeeded(time.timer(1)); assertFalse(res); @@ -2832,7 +2832,7 @@ public void testConsumerRejoinAfterRebalance() { client.respond(joinGroupFollowerResponse(generationId, memberId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - res = coordinator.joinGroupIfNeeded(time.timer(3000), time.timer(10)); + res = coordinator.joinGroupIfNeeded(time.timer(3000)); assertTrue(res); assertFalse(client.hasPendingResponses()); @@ -2914,7 +2914,7 @@ private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGrou subscriptions.subscribe(singleton(topic1), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); } else { subscriptions.assignFromUser(singleton(t1p)); } @@ -3136,7 +3136,7 @@ private void joinAsFollowerAndReceiveAssignment(ConsumerCoordinator coordinator, coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); client.prepareResponse(syncGroupResponse(assignment, Errors.NONE)); - coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE), time.timer(10)); + coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); } private void prepareOffsetCommitRequest(Map expectedOffsets, Errors error) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 373a522c9c2ff..1d85b28605d14 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -218,7 +218,7 @@ protected Map performAssignment(String leaderId, String prot } @Override - protected void onJoinPrepare(int generation, String memberId, final Timer offsetCommitTimer) { + protected boolean onJoinPrepare(int generation, String memberId) { log.info("Rebalance started"); leaderState(null); final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot; @@ -230,6 +230,7 @@ protected void onJoinPrepare(int generation, String memberId, final Timer offset log.debug("Cooperative rebalance triggered. Keeping assignment {} until it's " + "explicitly revoked.", localAssignmentSnapshot); } + return true; } @Override From 0a77af1e56247b6577b28340c19d36ece4ade56e Mon Sep 17 00:00:00 2001 From: rivensun Date: Sun, 31 Oct 2021 12:17:31 +0800 Subject: [PATCH 08/21] commit test code add test Method "testForceMetadataDeleteForPatternSubscriptionDuringRebalance()" Author: RivenSun2 Reviewers: Luke Chen --- .../internals/ConsumerCoordinatorTest.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 902ea9957e270..7c02e05b31990 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -148,6 +148,11 @@ public abstract class ConsumerCoordinatorTest { put(topic2, 1); } }); + private MetadataResponse deletedMetadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap() { + { + put(topic1, 1); + } + }); private Node node = metadataResponse.brokers().iterator().next(); private SubscriptionState subscriptions; private ConsumerMetadata metadata; @@ -2840,6 +2845,49 @@ public void testConsumerRejoinAfterRebalance() { } } + @Test + public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { + try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) { + subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener); + client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, new HashMap() { + { + put(topic1, 1); + put(topic2, 1); + } + })); + coordinator.maybeUpdateSubscriptionMetadata(); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), subscriptions.subscription()); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // Instrument the test so that metadata will contain only one topic after next refresh. + client.prepareMetadataUpdate(deletedMetadataResponse); + + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(body -> { + SyncGroupRequest sync = (SyncGroupRequest) body; + return sync.data().memberId().equals(consumerId) && + sync.data().generationId() == 1 && + sync.groupAssignments().isEmpty(); + }, syncGroupResponse(singletonList(t1p), Errors.NONE)); + + partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); + + // This will trigger rebalance. + coordinator.poll(time.timer(Long.MAX_VALUE)); + + // Make sure that the metadata was refreshed during the rebalance and thus subscriptions now contain only one topic. + assertEquals(singleton(topic1), subscriptions.subscription()); + + // Refresh the metadata again. Since there have been no changes since the last refresh, it won't trigger + // rebalance again. + metadata.requestUpdate(); + consumerClient.poll(time.timer(Long.MAX_VALUE)); + assertFalse(coordinator.rejoinNeededOrPending()); + } + } + @Test public void testThrowOnUnsupportedStableFlag() { supportStableFlag((short) 6, true); From 9e544b2703c6a31b41df0b996b729c4e60acccb1 Mon Sep 17 00:00:00 2001 From: rivensun Date: Sun, 31 Oct 2021 12:55:20 +0800 Subject: [PATCH 09/21] fix merge conflicts Author: RivenSun2 Reviewers: Luke Chen --- .../internals/ConsumerCoordinatorTest.java | 86 +++++++++---------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 7c02e05b31990..a00f9b9a5b08f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -997,6 +997,49 @@ public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() { assertFalse(coordinator.rejoinNeededOrPending()); } + @Test + public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { + try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) { + subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener); + client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, new HashMap() { + { + put(topic1, 1); + put(topic2, 1); + } + })); + coordinator.maybeUpdateSubscriptionMetadata(); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), subscriptions.subscription()); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + // Instrument the test so that metadata will contain only one topic after next refresh. + client.prepareMetadataUpdate(deletedMetadataResponse); + + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(body -> { + SyncGroupRequest sync = (SyncGroupRequest) body; + return sync.data().memberId().equals(consumerId) && + sync.data().generationId() == 1 && + sync.groupAssignments().isEmpty(); + }, syncGroupResponse(singletonList(t1p), Errors.NONE)); + + partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); + + // This will trigger rebalance. + coordinator.poll(time.timer(Long.MAX_VALUE)); + + // Make sure that the metadata was refreshed during the rebalance and thus subscriptions now contain only one topic. + assertEquals(singleton(topic1), subscriptions.subscription()); + + // Refresh the metadata again. Since there have been no changes since the last refresh, it won't trigger + // rebalance again. + metadata.requestUpdate(); + consumerClient.poll(time.timer(Long.MAX_VALUE)); + assertFalse(coordinator.rejoinNeededOrPending()); + } + } + /** * Verifies that the consumer re-joins after a metadata change. If JoinGroup fails * and metadata reverts to its original value, the consumer should still retry JoinGroup. @@ -2845,49 +2888,6 @@ public void testConsumerRejoinAfterRebalance() { } } - @Test - public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { - try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) { - subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener); - client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, new HashMap() { - { - put(topic1, 1); - put(topic2, 1); - } - })); - coordinator.maybeUpdateSubscriptionMetadata(); - assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), subscriptions.subscription()); - - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); - - // Instrument the test so that metadata will contain only one topic after next refresh. - client.prepareMetadataUpdate(deletedMetadataResponse); - - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); - client.prepareResponse(body -> { - SyncGroupRequest sync = (SyncGroupRequest) body; - return sync.data().memberId().equals(consumerId) && - sync.data().generationId() == 1 && - sync.groupAssignments().isEmpty(); - }, syncGroupResponse(singletonList(t1p), Errors.NONE)); - - partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); - - // This will trigger rebalance. - coordinator.poll(time.timer(Long.MAX_VALUE)); - - // Make sure that the metadata was refreshed during the rebalance and thus subscriptions now contain only one topic. - assertEquals(singleton(topic1), subscriptions.subscription()); - - // Refresh the metadata again. Since there have been no changes since the last refresh, it won't trigger - // rebalance again. - metadata.requestUpdate(); - consumerClient.poll(time.timer(Long.MAX_VALUE)); - assertFalse(coordinator.rejoinNeededOrPending()); - } - } - @Test public void testThrowOnUnsupportedStableFlag() { supportStableFlag((short) 6, true); From a5f6f81e836628f871da8887a1535e422b26906f Mon Sep 17 00:00:00 2001 From: rivensun Date: Thu, 4 Nov 2021 15:20:22 +0800 Subject: [PATCH 10/21] commit code changes for review Author: RivenSun2 riven.sun@zoom.us --- .../internals/AbstractCoordinator.java | 11 ++- .../internals/ConsumerCoordinator.java | 70 +++++-------------- .../internals/ConsumerCoordinatorTest.java | 12 ++-- 3 files changed, 30 insertions(+), 63 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 1a5d06ed9c6ce..af4820392a62c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -404,7 +404,7 @@ private void closeHeartbeatThread() { * * @param timer Timer bounding how long this method can block * @throws KafkaException if the callback throws exception - * @return true if the operation succeeded + * @return true iff the operation succeeded */ boolean joinGroupIfNeeded(final Timer timer) { while (rejoinNeededOrPending()) { @@ -421,14 +421,13 @@ boolean joinGroupIfNeeded(final Timer timer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; - if (!onJoinPrepare(generation.generationId, generation.memberId)) + if (!onJoinPrepare(generation.generationId, generation.memberId)) { needsJoinPrepare = true; + //should not initiateJoinGroup if needsJoinPrepare still is true + return false; + } } - //should not initiateJoinGroup if needsJoinPrepare still is true - if (needsJoinPrepare) - return false; - final RequestFuture future = initiateJoinGroup(); client.poll(future, timer); if (!future.isDone()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 9fa704c53c440..96d4604d4f037 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -95,7 +95,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final SubscriptionState subscriptions; private final OffsetCommitCallback defaultOffsetCommitCallback; private final boolean autoCommitEnabled; - private RequestFuture onJoinPrepareAsyncCommitFuture = null; private final int autoCommitIntervalMs; private final ConsumerInterceptors interceptors; private final AtomicInteger pendingAsyncCommits; @@ -695,15 +694,19 @@ private void validateCooperativeAssignment(final Map future = maybeAutoCommitOffsetsAsync(); + if (future == null) onJoinPrepareAsyncCommitSucceeded = true; - // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Asynchronous auto-commit offsets failed: {}", e.getMessage()); + else { + if (future.succeeded()) { + onJoinPrepareAsyncCommitSucceeded = true; + } else if (future.failed() && !future.isRetriable()) { + // consistent with async auto-commit failures, we do not propagate the exception + log.warn("Asynchronous auto-commit offsets failed: {}", future.exception().getMessage()); + onJoinPrepareAsyncCommitSucceeded = true; + } } // the generation / member-id can possibly be reset by the heartbeat thread @@ -920,7 +923,7 @@ public void close(final Timer timer) { // we do not need to re-enable wakeups since we are closing already client.disableWakeups(); try { - maybeAutoCommitOffsetsSync(timer); + maybeAutoCommitOffsetsAsync(); while (pendingAsyncCommits.get() > 0 && timer.notExpired()) { ensureCoordinatorReady(timer); client.poll(timer); @@ -1062,12 +1065,12 @@ public void maybeAutoCommitOffsetsAsync(long now) { nextAutoCommitTimer.update(now); if (nextAutoCommitTimer.isExpired()) { nextAutoCommitTimer.reset(autoCommitIntervalMs); - doAutoCommitOffsetsAsync(); + autoCommitOffsetsAsync(); } } } - private RequestFuture doAutoCommitOffsetsAsync() { + private RequestFuture autoCommitOffsetsAsync() { Map allConsumedOffsets = subscriptions.allConsumed(); log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets); @@ -1086,49 +1089,14 @@ private RequestFuture doAutoCommitOffsetsAsync() { }); } - private boolean maybeAutoCommitOffsetsAsync() { + private RequestFuture maybeAutoCommitOffsetsAsync() { if (autoCommitEnabled) { - invokeCompletedOffsetCommitCallbacks(); - - if (onJoinPrepareAsyncCommitFuture == null) - onJoinPrepareAsyncCommitFuture = doAutoCommitOffsetsAsync(); - if (onJoinPrepareAsyncCommitFuture == null) - return true; - + RequestFuture future = autoCommitOffsetsAsync(); client.pollNoWakeup(); invokeCompletedOffsetCommitCallbacks(); - - if (!onJoinPrepareAsyncCommitFuture.isDone()) - return false; - if (onJoinPrepareAsyncCommitFuture.succeeded()) { - onJoinPrepareAsyncCommitFuture = null; - return true; - } - if (onJoinPrepareAsyncCommitFuture.failed() && !onJoinPrepareAsyncCommitFuture.isRetriable()) - throw onJoinPrepareAsyncCommitFuture.exception(); - //retry Sending asynchronous auto-commit - onJoinPrepareAsyncCommitFuture = doAutoCommitOffsetsAsync(); - return false; + return future; } else - return true; - } - - private void maybeAutoCommitOffsetsSync(Timer timer) { - if (autoCommitEnabled) { - Map allConsumedOffsets = subscriptions.allConsumed(); - try { - log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets); - if (!commitOffsetsSync(allConsumedOffsets, timer)) - log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets); - } catch (WakeupException | InterruptException e) { - log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets); - // rethrow wakeups since they are triggered by the user - throw e; - } catch (Exception e) { - // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage()); - } - } + return null; } private class DefaultOffsetCommitCallback implements OffsetCommitCallback { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index c46e0a2fc1e13..6f5ddf1bdce4d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -151,11 +151,6 @@ public abstract class ConsumerCoordinatorTest { put(topic2, 1); } }); - private MetadataResponse deletedMetadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap() { - { - put(topic1, 1); - } - }); private Node node = metadataResponse.brokers().iterator().next(); private SubscriptionState subscriptions; private ConsumerMetadata metadata; @@ -1060,7 +1055,7 @@ public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() { @Test public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { - try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true)) { + try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) { subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener); client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, new HashMap() { { @@ -1074,6 +1069,11 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + MetadataResponse deletedMetadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap() { + { + put(topic1, 1); + } + }); // Instrument the test so that metadata will contain only one topic after next refresh. client.prepareMetadataUpdate(deletedMetadataResponse); From 12359c29e328aed05351c369ef271c7f12d06ce4 Mon Sep 17 00:00:00 2001 From: rivensun Date: Fri, 5 Nov 2021 22:24:28 +0800 Subject: [PATCH 11/21] commit code changes for showuon's review --- .../consumer/internals/AbstractCoordinator.java | 1 + .../consumer/internals/ConsumerCoordinator.java | 15 +++++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index af4820392a62c..1136769c7648b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -421,6 +421,7 @@ boolean joinGroupIfNeeded(final Timer timer) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; + // return false when onJoinPrepare is waiting for committing offset if (!onJoinPrepare(generation.generationId, generation.memberId)) { needsJoinPrepare = true; //should not initiateJoinGroup if needsJoinPrepare still is true diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 96d4604d4f037..185619118d592 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -694,18 +694,21 @@ private void validateCooperativeAssignment(final Map future = maybeAutoCommitOffsetsAsync(); if (future == null) - onJoinPrepareAsyncCommitSucceeded = true; + onJoinPrepareAsyncCommitCompleted = true; else { if (future.succeeded()) { - onJoinPrepareAsyncCommitSucceeded = true; - } else if (future.failed() && !future.isRetriable()) { + onJoinPrepareAsyncCommitCompleted = true; + } else if (future.failed()) { // consistent with async auto-commit failures, we do not propagate the exception log.warn("Asynchronous auto-commit offsets failed: {}", future.exception().getMessage()); - onJoinPrepareAsyncCommitSucceeded = true; + if (!future.isRetriable()) + onJoinPrepareAsyncCommitCompleted = true; } } @@ -763,7 +766,7 @@ protected boolean onJoinPrepare(int generation, String memberId) { if (exception != null) { throw new KafkaException("User rebalance callback throws an error", exception); } - return onJoinPrepareAsyncCommitSucceeded; + return onJoinPrepareAsyncCommitCompleted; } @Override From 359af7d1ed2fb22bfb2cb163d4c62dc7156d967d Mon Sep 17 00:00:00 2001 From: rivensun Date: Mon, 15 Nov 2021 10:50:26 +0800 Subject: [PATCH 12/21] commit code changes for showuon's review --- .../internals/ConsumerCoordinator.java | 22 ++---- .../internals/ConsumerCoordinatorTest.java | 78 +++++++++++++++++++ 2 files changed, 85 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 185619118d592..7430d9f48acdf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -696,21 +696,13 @@ protected boolean onJoinPrepare(int generation, String memberId) { log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId); boolean onJoinPrepareAsyncCommitCompleted = false; // async commit offsets prior to rebalance if auto-commit enabled - // and if auto-commit disable or the coordinatorUnknown is true, the future will be null, - // the asynchronous commit operation will not do. RequestFuture future = maybeAutoCommitOffsetsAsync(); - if (future == null) + // return true when + // 1. future is null, which means no commit request sent, so it is still considered completed + // 2. offset commit completed + // 3. offset commit failed with non-retriable error + if (future == null || future.succeeded() || (future.failed() && !future.isRetriable())) onJoinPrepareAsyncCommitCompleted = true; - else { - if (future.succeeded()) { - onJoinPrepareAsyncCommitCompleted = true; - } else if (future.failed()) { - // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Asynchronous auto-commit offsets failed: {}", future.exception().getMessage()); - if (!future.isRetriable()) - onJoinPrepareAsyncCommitCompleted = true; - } - } // the generation / member-id can possibly be reset by the heartbeat thread // upon getting errors or heartbeat timeouts; in this case whatever is previously @@ -1098,8 +1090,8 @@ private RequestFuture maybeAutoCommitOffsetsAsync() { client.pollNoWakeup(); invokeCompletedOffsetCommitCallbacks(); return future; - } else - return null; + } + return null; } private class DefaultOffsetCommitCallback implements OffsetCommitCallback { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 6f5ddf1bdce4d..1846055c1c0ea 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1101,6 +1101,84 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { } } + @Test + public void testJoinPrepareWithDisableAutoCommit() { + try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) { + coordinator.ensureActiveGroup(); + + prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); + + int generationId = 42; + String memberId = "consumer-42"; + + boolean res = coordinator.onJoinPrepare(generationId, memberId); + + assertTrue(res); + assertTrue(client.hasPendingResponses()); + assertFalse(client.hasInFlightRequests()); + assertFalse(coordinator.coordinatorUnknown()); + } + } + + @Test + public void testJoinPrepareAndCommitCompleted() { + try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"))) { + coordinator.ensureActiveGroup(); + + prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); + int generationId = 42; + String memberId = "consumer-42"; + + boolean res = coordinator.onJoinPrepare(generationId, memberId); + coordinator.invokeCompletedOffsetCommitCallbacks(); + + assertTrue(res); + assertFalse(client.hasPendingResponses()); + assertFalse(client.hasInFlightRequests()); + assertFalse(coordinator.coordinatorUnknown()); + } + } + + @Test + public void testJoinPrepareAndCommitWithCoordinatorNotAvailable() { + try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"))) { + coordinator.ensureActiveGroup(); + + prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE); + + int generationId = 42; + String memberId = "consumer-42"; + + boolean res = coordinator.onJoinPrepare(generationId, memberId); + coordinator.invokeCompletedOffsetCommitCallbacks(); + + assertFalse(res); + assertFalse(client.hasPendingResponses()); + assertFalse(client.hasInFlightRequests()); + assertTrue(coordinator.coordinatorUnknown()); + } + } + + @Test + public void testJoinPrepareAndCommitWithUnknownMemberId() { + try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"))) { + coordinator.ensureActiveGroup(); + + prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID); + + int generationId = 42; + String memberId = "consumer-42"; + + boolean res = coordinator.onJoinPrepare(generationId, memberId); + coordinator.invokeCompletedOffsetCommitCallbacks(); + + assertTrue(res); + assertFalse(client.hasPendingResponses()); + assertFalse(client.hasInFlightRequests()); + assertFalse(coordinator.coordinatorUnknown()); + } + } + /** * Verifies that the consumer re-joins after a metadata change. If JoinGroup fails * and metadata reverts to its original value, the consumer should still retry JoinGroup. From 82b3585ccc23332465a48b55abc125a1b5d9b984 Mon Sep 17 00:00:00 2001 From: rivensun Date: Thu, 23 Dec 2021 15:14:23 +0800 Subject: [PATCH 13/21] commit code changes for guozhangwang's review --- .../consumer/internals/ConsumerCoordinator.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 7430d9f48acdf..58fcf82060fae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -701,8 +701,15 @@ protected boolean onJoinPrepare(int generation, String memberId) { // 1. future is null, which means no commit request sent, so it is still considered completed // 2. offset commit completed // 3. offset commit failed with non-retriable error - if (future == null || future.succeeded() || (future.failed() && !future.isRetriable())) + if (future == null) onJoinPrepareAsyncCommitCompleted = true; + else if (future.succeeded()) + onJoinPrepareAsyncCommitCompleted = true; + else if (future.failed() && !future.isRetriable()) { + log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage()); + onJoinPrepareAsyncCommitCompleted = true; + } + // the generation / member-id can possibly be reset by the heartbeat thread // upon getting errors or heartbeat timeouts; in this case whatever is previously @@ -1085,12 +1092,8 @@ private RequestFuture autoCommitOffsetsAsync() { } private RequestFuture maybeAutoCommitOffsetsAsync() { - if (autoCommitEnabled) { - RequestFuture future = autoCommitOffsetsAsync(); - client.pollNoWakeup(); - invokeCompletedOffsetCommitCallbacks(); - return future; - } + if (autoCommitEnabled) + return autoCommitOffsetsAsync(); return null; } From a98bd876a3beb7b4ed043d5103ce0f828fcc954a Mon Sep 17 00:00:00 2001 From: rivensun Date: Fri, 24 Dec 2021 11:30:28 +0800 Subject: [PATCH 14/21] fix issue for ConsumerBounceTest testClose case failed --- .../test/scala/integration/kafka/api/ConsumerBounceTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 9fc8727fdf8dc..9d969188e53cd 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -288,7 +288,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { servers.foreach(server => killBroker(server.config.brokerId)) val closeTimeout = 2000 val future1 = submitCloseAndValidate(consumer1, closeTimeout, None, Some(closeTimeout)) - val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, Some(requestTimeout), Some(requestTimeout)) + val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(requestTimeout)) future1.get future2.get } From efef844824588e891b3e9da05c56066acc432629 Mon Sep 17 00:00:00 2001 From: rivensun Date: Tue, 28 Dec 2021 18:06:43 +0800 Subject: [PATCH 15/21] fix issue for KafkaConsumerTest testCase failed --- .../org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index bc7d506275e93..725d791679a5f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2230,7 +2230,7 @@ public void testInvalidGroupMetadata() throws InterruptedException { MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, - new RoundRobinAssignor(), true, groupInstanceId); + new RoundRobinAssignor(), false, groupInstanceId); consumer.subscribe(singletonList(topic)); // concurrent access is illegal client.enableBlockingUntilWakeup(1); @@ -2849,7 +2849,7 @@ private static boolean consumerMetricPresent(KafkaConsumer consu @Test public void testClosingConsumerUnregistersConsumerMetrics() { - Time time = new MockTime(); + Time time = new MockTime(1L); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); From 12fb32c29c11a330bf785e1920bd00e9af8d3308 Mon Sep 17 00:00:00 2001 From: rivensun Date: Tue, 28 Dec 2021 22:44:34 +0800 Subject: [PATCH 16/21] fix issue for testCloseManualAssignment case failed --- .../clients/consumer/internals/ConsumerCoordinatorTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 1846055c1c0ea..ae07f1ef7da11 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -3202,6 +3202,11 @@ private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLe OffsetCommitRequest commitRequest = (OffsetCommitRequest) body; return commitRequest.data().groupId().equals(groupId); }, new OffsetCommitResponse(new OffsetCommitResponseData())); + client.prepareResponse(body -> { + commitRequested.set(true); + OffsetCommitRequest commitRequest = (OffsetCommitRequest) body; + return commitRequest.data().groupId().equals(groupId); + }, new OffsetCommitResponse(new OffsetCommitResponseData())); client.prepareResponse(body -> { leaveGroupRequested.set(true); LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body; From 996b8d9c34798dddb0aeb53efeab83436429e5fc Mon Sep 17 00:00:00 2001 From: rivensun Date: Wed, 29 Dec 2021 09:17:53 +0800 Subject: [PATCH 17/21] LeaveGroupResponse should be prepared conditionally --- .../consumer/internals/ConsumerCoordinatorTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index ae07f1ef7da11..e9e55b3985af3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -3202,17 +3202,18 @@ private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLe OffsetCommitRequest commitRequest = (OffsetCommitRequest) body; return commitRequest.data().groupId().equals(groupId); }, new OffsetCommitResponse(new OffsetCommitResponseData())); + if (shouldLeaveGroup) + client.prepareResponse(body -> { + leaveGroupRequested.set(true); + LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body; + return leaveRequest.data().groupId().equals(groupId); + }, new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.NONE.code()))); client.prepareResponse(body -> { commitRequested.set(true); OffsetCommitRequest commitRequest = (OffsetCommitRequest) body; return commitRequest.data().groupId().equals(groupId); }, new OffsetCommitResponse(new OffsetCommitResponseData())); - client.prepareResponse(body -> { - leaveGroupRequested.set(true); - LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body; - return leaveRequest.data().groupId().equals(groupId); - }, new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.NONE.code()))); coordinator.close(); assertTrue(commitRequested.get(), "Commit not requested"); From de1580cfef85118ba3bd2b7a22a17851cc524d40 Mon Sep 17 00:00:00 2001 From: rivensun Date: Wed, 29 Dec 2021 15:43:16 +0800 Subject: [PATCH 18/21] fix issue for testInvalidGroupMetadata case --- .../org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 725d791679a5f..f86b6e311c04a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2230,7 +2230,7 @@ public void testInvalidGroupMetadata() throws InterruptedException { MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, - new RoundRobinAssignor(), false, groupInstanceId); + new RoundRobinAssignor(), true, groupInstanceId); consumer.subscribe(singletonList(topic)); // concurrent access is illegal client.enableBlockingUntilWakeup(1); @@ -2247,7 +2247,7 @@ public void testInvalidGroupMetadata() throws InterruptedException { } // accessing closed consumer is illegal - consumer.close(Duration.ofSeconds(5)); + consumer.close(Duration.ZERO); assertThrows(IllegalStateException.class, consumer::groupMetadata); } From f78969c759058b791117b709475714976283b290 Mon Sep 17 00:00:00 2001 From: RivenSun2 Date: Tue, 8 Feb 2022 13:51:59 +0800 Subject: [PATCH 19/21] branch conflicts resolved --- .../org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index f8b86b83cd9b7..27c108bcdacac 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2182,7 +2182,7 @@ public void testInvalidGroupMetadata() throws InterruptedException { } // accessing closed consumer is illegal - consumer.close(Duration.ofSeconds(5)); + consumer.close(Duration.ZERO); assertThrows(IllegalStateException.class, consumer::groupMetadata); } @@ -2766,6 +2766,7 @@ private static boolean consumerMetricPresent(KafkaConsumer consu @Test public void testClosingConsumerUnregistersConsumerMetrics() { + Time time = new MockTime(1L); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); From e0ab2e5d40fbca4f2e8f9864bf3e0af5ac921fae Mon Sep 17 00:00:00 2001 From: RivenSun2 Date: Wed, 9 Feb 2022 09:40:01 +0800 Subject: [PATCH 20/21] in order to retrigger jenkins tests --- .../kafka/clients/consumer/internals/ConsumerCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index cffdca577aca7..533f3cb40f280 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -714,7 +714,7 @@ protected boolean onJoinPrepare(int generation, String memberId) { // return true when // 1. future is null, which means no commit request sent, so it is still considered completed // 2. offset commit completed - // 3. offset commit failed with non-retriable error + // 3. offset commit failed with non-retriable exception if (future == null) onJoinPrepareAsyncCommitCompleted = true; else if (future.succeeded()) From 4ade6dd8567ef49d39c4477fd7455ff4e853a205 Mon Sep 17 00:00:00 2001 From: RivenSun2 Date: Wed, 2 Mar 2022 16:12:37 +0800 Subject: [PATCH 21/21] KAFKA-13694: When the Broker side processes the ProduceRequest, it prints more specific information when the verification record fails. Story JIRA: https://issues.apache.org/jira/browse/KAFKA-13694 --- .../kafka/common/requests/ProduceResponse.java | 8 ++++++++ core/src/main/scala/kafka/log/LogValidator.scala | 12 ++++-------- .../test/scala/unit/kafka/log/LogValidatorTest.scala | 9 +++++---- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 9b945366013b8..7c9d70b8d6784 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -226,6 +226,14 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(batchIndex, message); } + + @Override + public String toString() { + return "RecordError(" + + "batchIndex=" + batchIndex + + ", message=" + ((message == null) ? "null" : "'" + message + "'") + + ")"; + } } public static ProduceResponse parse(ByteBuffer buffer, short version) { diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 925c60294a6af..b40598f243084 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -22,7 +22,7 @@ import kafka.common.{LongRef, RecordValidationException} import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.utils.Logging -import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors.{CorruptRecordException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType} import org.apache.kafka.common.InvalidRecordException import org.apache.kafka.common.TopicPartition @@ -571,13 +571,9 @@ private[log] object LogValidator extends Logging { private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = { if (recordErrors.nonEmpty) { val errors = recordErrors.map(_.recordError) - if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) { - throw new RecordValidationException(new InvalidTimestampException( - "One or more records have been rejected due to invalid timestamp"), errors) - } else { - throw new RecordValidationException(new InvalidRecordException( - "One or more records have been rejected"), errors) - } + throw new RecordValidationException(new InvalidRecordException( + "One or more records have been rejected due to " + errors.size + " record errors " + + "in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors) } } diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 4275684230736..5676a8ee94121 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -25,7 +25,7 @@ import kafka.message._ import kafka.metrics.KafkaYammerMetrics import kafka.server.{BrokerTopicStats, RequestLocal} import kafka.utils.TestUtils.meterCount -import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.errors.{UnsupportedCompressionTypeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time import org.apache.kafka.common.{InvalidRecordException, TopicPartition} @@ -1352,7 +1352,7 @@ class LogValidatorTest { requestLocal = RequestLocal.withThreadConfinedCaching) ) - assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) + assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) assertTrue(e.recordErrors.nonEmpty) assertEquals(e.recordErrors.size, 3) } @@ -1397,8 +1397,9 @@ class LogValidatorTest { RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP) ) // if there is a mix of both regular InvalidRecordException and InvalidTimestampException, - // InvalidTimestampException takes precedence - assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException]) + // InvalidTimestampException is no longer takes precedence. The type of invalidException + // is unified as InvalidRecordException + assertTrue(e.invalidException.isInstanceOf[InvalidRecordException]) assertTrue(e.recordErrors.nonEmpty) assertEquals(6, e.recordErrors.size) }