From a03bcdfb7507385df14cfbc9d423805dc36ab960 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Wed, 29 Nov 2023 14:57:40 -0800 Subject: [PATCH 1/3] invoke partition revocation on the application thread --- .../internals/AsyncKafkaConsumer.java | 52 +++++++++++++- .../internals/ConsumerNetworkThread.java | 72 +++++++++++-------- .../internals/NetworkClientDelegate.java | 33 ++++++--- .../internals/ConsumerNetworkThreadTest.java | 21 +----- .../internals/ConsumerTestBuilder.java | 49 ++++++------- 5 files changed, 146 insertions(+), 81 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 0e93b5f6e21c8..c487aa7b528c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -97,6 +97,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -233,6 +234,7 @@ private void process(final GroupMetadataUpdateEvent event) { private final Metrics metrics; private final long retryBackoffMs; private final int defaultApiTimeoutMs; + private final boolean autoCommitEnabled; private volatile boolean closed = false; private final List assignors; private final Optional clientTelemetryReporter; @@ -264,6 +266,7 @@ private void process(final GroupMetadataUpdateEvent event) { GroupRebalanceConfig.ProtocolType.CONSUMER ); this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); + this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); LogContext logContext = createLogContext(config, groupRebalanceConfig); this.log = logContext.logger(getClass()); @@ -378,7 +381,8 @@ private void process(final GroupMetadataUpdateEvent event) { long retryBackoffMs, int defaultApiTimeoutMs, List assignors, - String groupId) { + String groupId, + boolean autoCommitEnabled) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = clientId; @@ -398,6 +402,7 @@ private void process(final GroupMetadataUpdateEvent event) { this.assignors = assignors; this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); this.clientTelemetryReporter = Optional.empty(); + this.autoCommitEnabled = autoCommitEnabled; } // Visible for testing @@ -413,6 +418,7 @@ private void process(final GroupMetadataUpdateEvent event) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); + this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); this.fetchBuffer = new FetchBuffer(logContext); this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; this.interceptors = new ConsumerInterceptors<>(Collections.emptyList()); @@ -1031,6 +1037,7 @@ private void close(Duration timeout, boolean swallowException) { // Invoke all callbacks after the background thread exists in case if there are unsent async // commits maybeInvokeCommitCallbacks(); + closeQuietly(() -> shutdownNetworkThread(timeout), "Failed to shutdown network thread", firstException); closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); closeQuietly(interceptors, "consumer interceptors", firstException); @@ -1050,6 +1057,49 @@ private void close(Duration timeout, boolean swallowException) { } } + private void shutdownNetworkThread(final Duration timeout) { + try { + prepShutdown(time.timer(timeout)); + } catch (Exception e) { + log.error("Error occurred during shutdown. Proceed closing the consumer.", e); + } finally { + // Once all partitions are revoked, we proceed with sending a leave group request and closing the network + // thread. + subscriptions.assignFromSubscribed(Collections.emptySet()); + if (applicationEventHandler != null) + applicationEventHandler.close(timeout); + } + } + /** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. invoke completed offset commit callbacks + * 3. revoke all partitions + */ + private void prepShutdown(final Timer timer) throws ExecutionException, InterruptedException, java.util.concurrent.TimeoutException { + if (autoCommitEnabled) { + Map allConsumed = subscriptions.allConsumed(); + commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); + } + // Invoke all callbacks after the background thread exists in case if there are unsent async + // commits + maybeInvokeCommitCallbacks(); + timer.update(); + revokePartitionsOnUnsubscribe().get(timer.remainingMs(), TimeUnit.MILLISECONDS); + subscriptions.assignFromSubscribed(Collections.emptySet()); + } + + private CompletableFuture revokePartitionsOnUnsubscribe() { + Set droppedPartitions = new HashSet<>(); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + // TODO: KAFKA-15276 + return CompletableFuture.completedFuture(null); + } + @Override public void wakeup() { wakeupTrigger.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 07bb9811b5b6f..afb7e8f9e0885 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -38,7 +38,10 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -62,6 +65,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private ApplicationEventProcessor applicationEventProcessor; private NetworkClientDelegate networkClientDelegate; private RequestManagers requestManagers; + private Optional membershipManager; private volatile boolean running; private final IdempotentCloser closer = new IdempotentCloser(); private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS); @@ -106,6 +110,7 @@ void initializeResources() { applicationEventProcessor = applicationEventProcessorSupplier.get(); networkClientDelegate = networkClientDelegateSupplier.get(); requestManagers = requestManagersSupplier.get(); + membershipManager = requestManagersSupplier.get().membershipManager; } /** @@ -274,15 +279,15 @@ private void closeInternal(final Duration timeout) { } void cleanup() { + log.trace("Closing the consumer network thread"); + Timer timer = time.timer(closeTimeout); try { - log.trace("Closing the consumer network thread"); - Timer timer = time.timer(closeTimeout); - maybeAutocommitOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); maybeLeaveGroup(timer); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { + networkClientDelegate.awaitPendingRequests(timer); closeQuietly(requestManagers, "request managers"); closeQuietly(networkClientDelegate, "network client delegate"); closeQuietly(applicationEventProcessor, "application event processor"); @@ -291,38 +296,49 @@ void cleanup() { } /** - * We need to autocommit before shutting down the consumer. The method needs to first connect to the coordinator - * node to construct the closing requests. Then wait for all closing requests to finish before returning. The - * method is bounded by a closing timer. We will continue closing down the consumer if the requests cannot be - * completed in time. + * Leave the group when the consumer is shutting down. */ - // Visible for testing - void maybeAutocommitOnClose(final Timer timer) { - if (!requestManagers.coordinatorRequestManager.isPresent()) - return; - - if (!requestManagers.commitRequestManager.isPresent()) { - log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); - return; - } - - if (!requestManagers.commitRequestManager.get().canAutoCommit()) { + void maybeLeaveGroup(final Timer timer) throws ExecutionException, InterruptedException, TimeoutException { + if (!membershipManager.isPresent()) { return; } + // The partition should already been revoked at this point, so we invoke leaveGroup to complete the state + // transition and poll the heartbeatRequestManager one last time. + membershipManager.get().leaveGroup().get(timer.remainingMs(), TimeUnit.MILLISECONDS); + sendLeaveGroupOnClose(timer); + } + private void sendLeaveGroupOnClose(final Timer timer) { ensureCoordinatorReady(timer); - NetworkClientDelegate.UnsentRequest autocommitRequest = - requestManagers.commitRequestManager.get().createCommitAllConsumedRequest(); - networkClientDelegate.add(autocommitRequest); - do { - long currentTimeMs = timer.currentTimeMs(); - ensureCoordinatorReady(timer); - networkClientDelegate.poll(timer.remainingMs(), currentTimeMs); - } while (timer.notExpired() && !autocommitRequest.future().isDone()); + HeartbeatRequestManager hrm = requestManagers.heartbeatRequestManager.orElseThrow(() -> + new IllegalStateException( + "Expecting a HeartbeatRequestManager but the object was never initialized.")); + long nowMs = time.milliseconds(); + // Ensure the request gets sent out + networkClientDelegate.addAll(hrm.poll(nowMs)); + networkClientDelegate.poll(0, nowMs); } - void maybeLeaveGroup(final Timer timer) { - // TODO: Leave group upon closing the consumer + private void completePartitionRevocationOnClose(final Timer timer) throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture leaveGroupFuture = membershipManager.get().leaveGroup(); + while (timer.notExpired() && !leaveGroupFuture.isDone()) { + networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); + if (pollAssignmentChangeEvents(timer)) { + // Expecting a partitionRevocation completion event from the application thread + // Once completed. We are done with partition revocation and + break; + } + timer.update(); + } + } + + /** + * We need to continue to poll the ApplicationEventQueue during closing so that we can process the callback + * completion events and finish the leave group. + */ + private boolean pollAssignmentChangeEvents(final Timer timer) { + // TODO: To be implemented with KAFKA-15276 + return true; } private void ensureCoordinatorReady(final Timer timer) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 141f5f955c8b5..10e0b345dcb71 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -64,10 +64,10 @@ public class NetworkClientDelegate implements AutoCloseable { private final long retryBackoffMs; public NetworkClientDelegate( - final Time time, - final ConsumerConfig config, - final LogContext logContext, - final KafkaClient client) { + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { this.time = time; this.client = client; this.log = logContext.logger(getClass()); @@ -130,6 +130,21 @@ public void poll(final long timeoutMs, final long currentTimeMs) { checkDisconnects(currentTimeMs); } + /** + * Block until all pending requests from the given node have finished. + */ + public void awaitPendingRequests(Timer timer) { + while (!unsentRequests().isEmpty() && timer.notExpired()) { + poll(timer.remainingMs(), timer.currentTimeMs()); + timer.update(); + } + + if (!unsentRequests.isEmpty()) { + log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", + unsentRequests.size()); + } + } + /** * Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will * find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a @@ -310,11 +325,11 @@ Optional node() { @Override public String toString() { return "UnsentRequest{" + - "requestBuilder=" + requestBuilder + - ", handler=" + handler + - ", node=" + node + - ", timer=" + timer + - '}'; + "requestBuilder=" + requestBuilder + + ", handler=" + handler + + ", node=" + node + + ", timer=" + timer + + '}'; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index a1370918e4dcb..c478fc39290a2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -296,25 +296,8 @@ void testCoordinatorConnectionOnClose() { prepareOffsetCommitRequest(singletonMap(tp, 100L), Errors.NONE, false); consumerNetworkThread.cleanup(); assertTrue(coordinatorRequestManager.coordinator().isPresent()); - assertFalse(client.hasPendingResponses()); - assertFalse(client.hasInFlightRequests()); - } - - @Test - void testAutoCommitOnClose() { - TopicPartition tp = new TopicPartition("topic", 0); - Node node = metadata.fetch().nodes().get(0); - subscriptions.assignFromUser(singleton(tp)); - subscriptions.seek(tp, 100); - coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(singletonMap(tp, 100L), Errors.NONE, false); - consumerNetworkThread.maybeAutocommitOnClose(time.timer(1000)); - assertTrue(coordinatorRequestManager.coordinator().isPresent()); - verify(commitRequestManager).createCommitAllConsumedRequest(); - - assertFalse(client.hasPendingResponses()); - assertFalse(client.hasInFlightRequests()); + assertFalse(client.hasPendingResponses(), "There should be no pending responses but found " + client.futureResponses()); + assertFalse(client.hasInFlightRequests(), "There should be no pending requests, but found " + client.requests()); } private void prepareTearDown() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 97d53dead8534..2346427cf429a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -328,34 +328,35 @@ public AsyncKafkaConsumerTestBuilder(Optional groupInfo, boole super(groupInfo, enableAutoCommit, enableAutoTick); String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); List assignors = ConsumerPartitionAssignor.getAssignorInstances( - config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), - config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) + config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), + config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) ); Deserializers deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer()); this.fetchCollector = spy(new FetchCollector<>(logContext, - metadata, - subscriptions, - fetchConfig, - deserializers, - metricsManager, - time)); + metadata, + subscriptions, + fetchConfig, + deserializers, + metricsManager, + time)); this.consumer = spy(new AsyncKafkaConsumer<>( - logContext, - clientId, - deserializers, - new FetchBuffer(logContext), - fetchCollector, - new ConsumerInterceptors<>(Collections.emptyList()), - time, - applicationEventHandler, - backgroundEventQueue, - metrics, - subscriptions, - metadata, - retryBackoffMs, - 60000, - assignors, - groupInfo.map(groupInformation -> groupInformation.groupState.groupId).orElse(null))); + logContext, + clientId, + deserializers, + new FetchBuffer(logContext), + fetchCollector, + new ConsumerInterceptors<>(Collections.emptyList()), + time, + applicationEventHandler, + backgroundEventQueue, + metrics, + subscriptions, + metadata, + retryBackoffMs, + 60000, + assignors, + groupInfo.map(groupInformation -> groupInformation.groupState.groupId).orElse(null), + enableAutoCommit)); } @Override From fa800ae2854405117d99cc8f5dba8e8c4daeb740 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Mon, 4 Dec 2023 14:01:57 -0800 Subject: [PATCH 2/3] Clean up tests and code Update AsyncKafkaConsumer.java test ensure heartbeat manager is polled Clean up Update AsyncKafkaConsumerTest.java clean up Update ConsumerNetworkThreadTest.java leave group on close stuff --- .../internals/AsyncKafkaConsumer.java | 80 ++++++++++-------- .../internals/ConsumerNetworkThread.java | 72 +++++++--------- .../consumer/internals/MembershipManager.java | 5 ++ .../internals/MembershipManagerImpl.java | 26 +++++- .../internals/AsyncKafkaConsumerTest.java | 83 ++++++++++++------- .../internals/ConsumerNetworkThreadTest.java | 27 +++++- 6 files changed, 182 insertions(+), 111 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index c487aa7b528c3..391120b9f99b5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -79,6 +79,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.slf4j.Logger; +import org.slf4j.event.Level; import java.net.InetSocketAddress; import java.time.Duration; @@ -93,6 +94,8 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -120,6 +123,7 @@ import static org.apache.kafka.common.utils.Utils.closeQuietly; import static org.apache.kafka.common.utils.Utils.isBlank; import static org.apache.kafka.common.utils.Utils.join; +import static org.apache.kafka.common.utils.Utils.swallow; /** * This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process @@ -211,7 +215,7 @@ private void process(final GroupMetadataUpdateEvent event) { private final ApplicationEventHandler applicationEventHandler; private final Time time; - private Optional groupMetadata; + private Optional groupMetadata = Optional.empty(); private final KafkaConsumerMetrics kafkaConsumerMetrics; private Logger log; private final String clientId; @@ -1030,16 +1034,14 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - + // Prepare shutting down the network thread + swallow(log, Level.ERROR, "Unexpected exception when preparing for shutdown", () -> prepareShutdown(closeTimer), firstException); + closeTimer.update(); if (applicationEventHandler != null) - closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); - - // Invoke all callbacks after the background thread exists in case if there are unsent async - // commits - maybeInvokeCommitCallbacks(); - closeQuietly(() -> shutdownNetworkThread(timeout), "Failed to shutdown network thread", firstException); - - closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); + closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); + closeTimer.update(); + // Ensure all async commit callbacks are invoked + swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback", this::maybeInvokeCommitCallbacks, firstException); closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); @@ -1057,46 +1059,54 @@ private void close(Duration timeout, boolean swallowException) { } } - private void shutdownNetworkThread(final Duration timeout) { + /** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ + private void prepareShutdown(final Timer timer) { + if (!groupMetadata.isPresent()) + return; + + maybeAutoCommitSync(timer); + timer.update(); + if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) + return; + try { - prepShutdown(time.timer(timeout)); + timer.update(); + // If the consumer is in a group, we will pause and revoke all assigned partitions + onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS); } catch (Exception e) { - log.error("Error occurred during shutdown. Proceed closing the consumer.", e); + Exception exception = e; + if (e instanceof ExecutionException) + exception = (Exception) e.getCause(); + throw new KafkaException("User rebalance callback throws an error", exception); } finally { - // Once all partitions are revoked, we proceed with sending a leave group request and closing the network - // thread. subscriptions.assignFromSubscribed(Collections.emptySet()); - if (applicationEventHandler != null) - applicationEventHandler.close(timeout); } } - /** - * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: - * 1. autocommit offsets - * 2. invoke completed offset commit callbacks - * 3. revoke all partitions - */ - private void prepShutdown(final Timer timer) throws ExecutionException, InterruptedException, java.util.concurrent.TimeoutException { + + private void maybeAutoCommitSync(final Timer timer) { if (autoCommitEnabled) { Map allConsumed = subscriptions.allConsumed(); - commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); + try { + log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); + } catch (Exception e) { + // consistent with async auto-commit failures, we do not propagate the exception + log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); + } } - // Invoke all callbacks after the background thread exists in case if there are unsent async - // commits - maybeInvokeCommitCallbacks(); - timer.update(); - revokePartitionsOnUnsubscribe().get(timer.remainingMs(), TimeUnit.MILLISECONDS); - subscriptions.assignFromSubscribed(Collections.emptySet()); } - private CompletableFuture revokePartitionsOnUnsubscribe() { - Set droppedPartitions = new HashSet<>(); + private CompletableFuture onLeavePrepare() { + SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); droppedPartitions.addAll(subscriptions.assignedPartitions()); if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) { return CompletableFuture.completedFuture(null); } - - // TODO: KAFKA-15276 + // TODO: Invoke rebalanceListener via KAFKA-15276 return CompletableFuture.completedFuture(null); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index afb7e8f9e0885..f0520230488a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -38,10 +38,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -148,7 +145,6 @@ void runOnce() { .map(networkClientDelegate::addAll) .reduce(MAX_POLL_TIMEOUT_MS, Math::min); networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs); - cachedMaximumTimeToWait = requestManagers.entries().stream() .filter(Optional::isPresent) .map(Optional::get) @@ -199,11 +195,11 @@ static void runAtClose(final Collection> requ // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until // all requests have received a response. - do { + while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)) { pollWaitTimeMs = Math.min(pollWaitTimeMs, timer.remainingMs()); networkClientDelegate.poll(pollWaitTimeMs, timer.currentTimeMs()); timer.update(); - } while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)); + } } public boolean isRunning() { @@ -282,8 +278,10 @@ void cleanup() { log.trace("Closing the consumer network thread"); Timer timer = time.timer(closeTimeout); try { + prepareForShutdown(timer); + // Send out the unsent commits and try to complete them before timer runs out runAtClose(requestManagers.entries(), networkClientDelegate, timer); - maybeLeaveGroup(timer); + maybeLeaveGroup(); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { @@ -295,59 +293,49 @@ void cleanup() { } } + private void prepareForShutdown(final Timer timer) { + if (!requestManagers.coordinatorRequestManager.isPresent()) + return; + if (!requestManagers.commitRequestManager.isPresent()) { + log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); + return; + } + + // We need a coordinator node for the commit request manager to send the remaining commit requests + ensureCoordinatorReady(timer); + } + /** * Leave the group when the consumer is shutting down. */ - void maybeLeaveGroup(final Timer timer) throws ExecutionException, InterruptedException, TimeoutException { - if (!membershipManager.isPresent()) { + void maybeLeaveGroup() { + if (!membershipManager.isPresent()) return; - } - // The partition should already been revoked at this point, so we invoke leaveGroup to complete the state - // transition and poll the heartbeatRequestManager one last time. - membershipManager.get().leaveGroup().get(timer.remainingMs(), TimeUnit.MILLISECONDS); - sendLeaveGroupOnClose(timer); - } - private void sendLeaveGroupOnClose(final Timer timer) { - ensureCoordinatorReady(timer); + // We do not need to send leave group when the member is unsubscribed or fatal + if (membershipManager.get().shouldSkipHeartbeat()) + return; + + membershipManager.get().leaveGroupOnClose(); HeartbeatRequestManager hrm = requestManagers.heartbeatRequestManager.orElseThrow(() -> - new IllegalStateException( - "Expecting a HeartbeatRequestManager but the object was never initialized.")); + new IllegalStateException( + "Expecting a GroupHeartbeatRequest but the object was never initialized.")); + log.debug("Sending GroupHeartbeatRequest with epoch {} to coordinator {} to leave the group before close", + membershipManager.get().memberEpoch(), + requestManagers.coordinatorRequestManager.get().coordinator()); long nowMs = time.milliseconds(); // Ensure the request gets sent out networkClientDelegate.addAll(hrm.poll(nowMs)); networkClientDelegate.poll(0, nowMs); } - private void completePartitionRevocationOnClose(final Timer timer) throws ExecutionException, InterruptedException, TimeoutException { - CompletableFuture leaveGroupFuture = membershipManager.get().leaveGroup(); - while (timer.notExpired() && !leaveGroupFuture.isDone()) { - networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); - if (pollAssignmentChangeEvents(timer)) { - // Expecting a partitionRevocation completion event from the application thread - // Once completed. We are done with partition revocation and - break; - } - timer.update(); - } - } - - /** - * We need to continue to poll the ApplicationEventQueue during closing so that we can process the callback - * completion events and finish the leave group. - */ - private boolean pollAssignmentChangeEvents(final Timer timer) { - // TODO: To be implemented with KAFKA-15276 - return true; - } - private void ensureCoordinatorReady(final Timer timer) { while (!coordinatorReady() && timer.notExpired()) { findCoordinatorSync(timer); } } - private boolean coordinatorReady() { + boolean coordinatorReady() { CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.orElseThrow( () -> new IllegalStateException("CoordinatorRequestManager uninitialized.")); Optional coordinator = coordinatorRequestManager.coordinator(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java index 4727daa0f64b8..0513ce1f244ae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java @@ -117,6 +117,11 @@ public interface MembershipManager { */ CompletableFuture leaveGroup(); + /** + * Leaving the group when the user closes the consumer. + */ + void leaveGroupOnClose(); + /** * @return True if the member should send heartbeat to the coordinator without waiting for * the interval. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index c6393bbeefcd4..8d80a1fd20547 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -104,14 +104,14 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource /** * TopicPartition comparator based on topic name and partition id. */ - private final static TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = + final static TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new TopicPartitionComparator(); /** * TopicIdPartition comparator based on topic name and partition id (ignoring ID while sorting, * as this is sorted mainly for logging purposes). */ - private final static TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = + final static TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = new TopicIdPartitionComparator(); /** @@ -481,6 +481,28 @@ public CompletableFuture leaveGroup() { return leaveResult; } + /** + * When closing down the consumer. The subscriptions are reset by the application thread therefore we just need to + * transition the state to LEAVING and set the epoch to -1/-2. + */ + @Override + public void leaveGroupOnClose() { + if (state == MemberState.UNSUBSCRIBED || + state == MemberState.FATAL || + state == MemberState.LEAVING) { + return; + } + + if (state == MemberState.PREPARE_LEAVING) { + transitionToSendingLeaveGroup(); + return; + } + + transitionTo(MemberState.PREPARE_LEAVING); + transitionToSendingLeaveGroup(); + leaveGroupInProgress = Optional.of(CompletableFuture.completedFuture(null)); + } + /** * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or * onPartitionsLost. diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index d39b63e979022..1da593446718a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import java.util.Locale; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -81,6 +80,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -97,6 +97,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -129,8 +130,8 @@ public class AsyncKafkaConsumerTest { @BeforeEach public void setup() { - // By default, the consumer is part of a group and autoCommit is enabled. - setup(ConsumerTestBuilder.createDefaultGroupInformation(), true); + // By default, the consumer is part of a group and autoCommit is disabled. + setup(ConsumerTestBuilder.createDefaultGroupInformation(), false); } private void setup(Optional groupInfo, boolean enableAutoCommit) { @@ -164,14 +165,10 @@ private void resetWithAutoCommitEnabled() { setup(ConsumerTestBuilder.createDefaultGroupInformation(), true); } - @Test - public void testSuccessfulStartupShutdown() { - assertDoesNotThrow(() -> consumer.close()); - } - @Test public void testSuccessfulStartupShutdownWithAutoCommit() { resetWithAutoCommitEnabled(); + prepareCoordinatorResponse(DEFAULT_GROUP_ID, Errors.NONE); TopicPartition tp = new TopicPartition("topic", 0); consumer.assign(singleton(tp)); consumer.seek(tp, 100); @@ -187,6 +184,7 @@ public void testInvalidGroupId() { @Test public void testFailOnClosedConsumer() { + prepareCoordinatorResponse(DEFAULT_GROUP_ID, Errors.NONE); consumer.close(); final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::assignment); assertEquals("This consumer has already been closed.", res.getMessage()); @@ -325,7 +323,6 @@ public void testWakeupAfterEmptyFetch() { assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } - @Test public void testWakeupAfterNonEmptyFetch() { final String topicName = "foo"; final int partition = 3; @@ -427,6 +424,7 @@ public void testCommitSyncLeaderEpochUpdate() { CompletableFuture commitFuture = new CompletableFuture<>(); commitFuture.complete(null); + prepareCommit(Arrays.asList(t1, t0), DEFAULT_GROUP_ID, Errors.NONE); try (MockedConstruction ignored = commitEventMocker(commitFuture)) { assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets)); } @@ -445,10 +443,15 @@ public void testCommitAsyncLeaderEpochUpdate() { topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), "")); consumer.assign(Arrays.asList(t0, t1)); + consumer.seek(t0, 10); + consumer.seek(t1, 20); CompletableFuture commitFuture = new CompletableFuture<>(); commitFuture.complete(null); + prepareCommit(Arrays.asList(t1, t0), DEFAULT_GROUP_ID, Errors.NONE); + // TODO: The log shows NPE thrown from the CommitRequestManager, which is caused by the use of mock. + // Ideally, we will need to abandon the use of spy() in the testBuilder and mock the dependencies instead. try (MockedConstruction ignored = commitEventMocker(commitFuture)) { assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, callback)); } @@ -474,6 +477,8 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() { public void testEnsureShutdownExecutedCommitAsyncCallbacks() { MockCommitCallback callback = new MockCommitCallback(); CompletableFuture future = new CompletableFuture<>(); + testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, DEFAULT_GROUP_ID, + testBuilder.metadata.fetch().nodes().get(0))); doReturn(future).when(consumer).commit(new HashMap<>(), false); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); future.complete(null); @@ -953,17 +958,23 @@ public void testGroupIdNull() { @Test public void testGroupIdNotNullAndValid() { - final Properties props = requiredConsumerPropertiesAndGroupId("consumerGroupA"); + // close the default consumer + shutDown(); + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); final ConsumerConfig config = new ConsumerConfig(props); - - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + try { + AsyncKafkaConsumer consumer = + new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()); assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); } catch (final Exception exception) { throw new AssertionFailedError("The following exception was not expected:", exception); + } finally { + // We need 0ms shutdown to avoid leaving the test hanging for the default duration + consumer.close(Duration.ofMillis(0)); } } @@ -1113,21 +1124,9 @@ private HashMap mockTimestampToSearch() { return timestampToSearch; } - private void prepAutocommitOnClose() { - Node node = testBuilder.metadata.fetch().nodes().get(0); - testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - if (!testBuilder.subscriptions.allConsumed().isEmpty()) { - List topicPartitions = new ArrayList<>(testBuilder.subscriptions.assignedPartitionsList()); - testBuilder.client.prepareResponse(mockAutocommitResponse( - topicPartitions, - (short) 1, - Errors.NONE).responseBody()); - } - } - - private ClientResponse mockAutocommitResponse(final List topicPartitions, - final short apiKeyVersion, - final Errors error) { + private ClientResponse mockCommitResponse(final List topicPartitions, + final short apiKeyVersion, + final Errors error) { OffsetCommitResponseData responseData = new OffsetCommitResponseData(); List responseTopics = new ArrayList<>(); topicPartitions.forEach(tp -> { @@ -1153,5 +1152,33 @@ private ClientResponse mockAutocommitResponse(final List topicPa new OffsetCommitResponse(responseData) ); } + + private void prepareCommit(List topicPartitions, String groupId, Errors error) { + testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(error, groupId, + testBuilder.metadata.fetch().nodes().get(0))); + testBuilder.client.prepareResponse(mockCommitResponse( + topicPartitions, + ApiKeys.OFFSET_COMMIT.latestVersion(), + Errors.NONE).responseBody()); + } + + private void prepareCoordinatorResponse(String groupId, Errors error) { + Node node = testBuilder.metadata.fetch().nodes().get(0); + testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(error, groupId, + node)); + } + + + private void prepAutocommitOnClose() { + Node node = testBuilder.metadata.fetch().nodes().get(0); + testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, DEFAULT_GROUP_ID, + node)); + if (!testBuilder.subscriptions.allConsumed().isEmpty()) { + doAnswer(i -> { + System.out.println("auto commit sync on close"); + return null; + }).when(consumer).commitSync(any(), any()); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index c478fc39290a2..42b7682b0d70f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -56,7 +56,7 @@ import java.util.concurrent.CompletableFuture; import static java.util.Collections.singleton; -import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; @@ -67,6 +67,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -85,6 +86,8 @@ public class ConsumerNetworkThreadTest { private OffsetsRequestManager offsetsRequestManager; private CommitRequestManager commitRequestManager; private CoordinatorRequestManager coordinatorRequestManager; + private HeartbeatRequestManager heartbeatRequestManager; + private MembershipManager memberhipsManager; private ConsumerNetworkThread consumerNetworkThread; private MockClient client; private SubscriptionState subscriptions; @@ -101,6 +104,8 @@ public void setup() { commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); offsetsRequestManager = testBuilder.offsetsRequestManager; coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); + heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); + memberhipsManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); consumerNetworkThread = testBuilder.consumerNetworkThread; subscriptions = testBuilder.subscriptions; consumerNetworkThread.initializeResources(); @@ -293,11 +298,25 @@ void testCoordinatorConnectionOnClose() { Node node = metadata.fetch().nodes().get(0); coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(singletonMap(tp, 100L), Errors.NONE, false); consumerNetworkThread.cleanup(); assertTrue(coordinatorRequestManager.coordinator().isPresent()); - assertFalse(client.hasPendingResponses(), "There should be no pending responses but found " + client.futureResponses()); - assertFalse(client.hasInFlightRequests(), "There should be no pending requests, but found " + client.requests()); + assertFalse(client.hasPendingResponses(), + "There should be 0 pending response but found " + client.futureResponses().size()); + assertFalse(client.hasInFlightRequests(), + "There should be 0 pending request, but found " + client.requests().size()); + } + + @Test + void testEnsurePollHeartbeatOnClose() { + when(memberhipsManager.shouldSkipHeartbeat()).thenReturn(false); + Node node = metadata.fetch().nodes().get(0); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, DEFAULT_GROUP_ID, node)); + consumerNetworkThread.cleanup(); + doAnswer(invocation -> { + System.out.println("leave group on close"); + return null; + }).when(memberhipsManager).leaveGroupOnClose(); + verify(heartbeatRequestManager).poll(anyLong()); } private void prepareTearDown() { From e22a19f4c918b38639274c49155c2e1d09673d29 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Tue, 5 Dec 2023 13:24:55 -0800 Subject: [PATCH 3/3] Update AsyncKafkaConsumer.java --- .../kafka/clients/consumer/internals/AsyncKafkaConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 391120b9f99b5..918a170598f4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1074,9 +1074,9 @@ private void prepareShutdown(final Timer timer) { return; try { - timer.update(); // If the consumer is in a group, we will pause and revoke all assigned partitions onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS); + timer.update(); } catch (Exception e) { Exception exception = e; if (e instanceof ExecutionException)