From 1fe3f02e637f55939cd23249b0e10f9a635cbae9 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Wed, 29 Nov 2023 14:57:40 -0800 Subject: [PATCH 01/14] Refactor consumer close --- .../internals/AsyncKafkaConsumer.java | 148 ++++++++++++++++-- .../internals/CommitRequestManager.java | 5 +- .../internals/ConsumerNetworkThread.java | 76 +-------- .../consumer/internals/MembershipManager.java | 5 + .../internals/MembershipManagerImpl.java | 26 ++- .../internals/NetworkClientDelegate.java | 33 ++-- .../internals/events/ApplicationEvent.java | 3 +- .../events/ApplicationEventProcessor.java | 44 +++++- .../events/ConsumerCloseApplicationEvent.java | 66 ++++++++ .../ApplicationEventProcessorTest.java | 96 ++++++++++++ .../internals/AsyncKafkaConsumerTest.java | 19 ++- .../internals/CommitRequestManagerTest.java | 16 ++ .../internals/ConsumerNetworkThreadTest.java | 38 +---- .../internals/ConsumerTestBuilder.java | 3 +- 14 files changed, 444 insertions(+), 134 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java create mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 2d5906320b6e3..a6a999f12d024 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -45,6 +45,7 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ConsumerCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent; @@ -84,6 +85,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.slf4j.Logger; +import org.slf4j.event.Level; import java.net.InetSocketAddress; import java.time.Duration; @@ -94,14 +96,17 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -124,6 +129,7 @@ import static org.apache.kafka.common.utils.Utils.closeQuietly; import static org.apache.kafka.common.utils.Utils.isBlank; import static org.apache.kafka.common.utils.Utils.join; +import static org.apache.kafka.common.utils.Utils.swallow; /** * This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process @@ -245,7 +251,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final ApplicationEventHandler applicationEventHandler; private final Time time; - private Optional groupMetadata; + private Optional groupMetadata = Optional.empty(); private final KafkaConsumerMetrics kafkaConsumerMetrics; private Logger log; private final String clientId; @@ -268,6 +274,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final Metrics metrics; private final long retryBackoffMs; private final int defaultApiTimeoutMs; + private final boolean autoCommitEnabled; private volatile boolean closed = false; private final List assignors; private final Optional clientTelemetryReporter; @@ -313,6 +320,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { GroupRebalanceConfig.ProtocolType.CONSUMER ); this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); + this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); LogContext logContext = createLogContext(config, groupRebalanceConfig); this.log = logContext.logger(getClass()); @@ -371,6 +379,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { clientTelemetryReporter); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, + networkClientDelegateSupplier, applicationEventQueue, requestManagersSupplier); this.applicationEventHandler = applicationEventHandlerFactory.build( @@ -434,6 +443,51 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { } // Visible for testing + AsyncKafkaConsumer(LogContext logContext, + String clientId, + Deserializers deserializers, + FetchBuffer fetchBuffer, + FetchCollector fetchCollector, + ConsumerInterceptors interceptors, + Time time, + ApplicationEventHandler applicationEventHandler, + BlockingQueue backgroundEventQueue, + ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, + Metrics metrics, + SubscriptionState subscriptions, + ConsumerMetadata metadata, + long retryBackoffMs, + int defaultApiTimeoutMs, + List assignors, + String groupId, + boolean autoCommitEnabled) { + this.log = logContext.logger(getClass()); + this.subscriptions = subscriptions; + this.clientId = clientId; + this.fetchBuffer = fetchBuffer; + this.fetchCollector = fetchCollector; + this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; + this.interceptors = Objects.requireNonNull(interceptors); + this.time = time; + this.backgroundEventProcessor = new BackgroundEventProcessor( + logContext, + backgroundEventQueue, + applicationEventHandler, + rebalanceListenerInvoker + ); + this.metrics = metrics; + this.groupMetadata = initializeGroupMetadata(groupId, Optional.empty()); + this.metadata = metadata; + this.retryBackoffMs = retryBackoffMs; + this.defaultApiTimeoutMs = defaultApiTimeoutMs; + this.deserializers = deserializers; + this.applicationEventHandler = applicationEventHandler; + this.assignors = assignors; + this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); + this.clientTelemetryReporter = Optional.empty(); + this.autoCommitEnabled = autoCommitEnabled; + } + AsyncKafkaConsumer(LogContext logContext, Time time, ConsumerConfig config, @@ -446,6 +500,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); + this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); this.fetchBuffer = new FetchBuffer(logContext); this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; this.interceptors = new ConsumerInterceptors<>(Collections.emptyList()); @@ -517,6 +572,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, metadata, + networkClientDelegateSupplier, applicationEventQueue, requestManagersSupplier ); @@ -1159,15 +1215,14 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - + // Prepare shutting down the network thread + prepareShutdown(closeTimer, firstException); + closeTimer.update(); if (applicationEventHandler != null) - closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); - - // Invoke all callbacks after the background thread exists in case if there are unsent async - // commits - maybeInvokeCommitCallbacks(); - - closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); + closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); + closeTimer.update(); + // Ensure all async commit callbacks are invoked + swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback", this::maybeInvokeCommitCallbacks, firstException); closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); @@ -1185,6 +1240,81 @@ private void close(Duration timeout, boolean swallowException) { } } + /** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ + private void prepareShutdown(final Timer timer, final AtomicReference firstException) { + if (!groupMetadata.isPresent()) + return; + maybeAutoCommitSync(timer, firstException); + timer.update(); + waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, timer.remainingMs()), timer, firstException); + maybeInvokeCommitCallbacks(); + maybeRevokePartitions(timer, firstException); + waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, timer.remainingMs()), timer, firstException); + } + + private void waitOnEventCompletion(final ConsumerCloseApplicationEvent event, + final Timer timer, + final AtomicReference firstException) { + try { + applicationEventHandler.addAndGet(event, timer); + } catch (TimeoutException e) { + log.debug("Timeout of {}ms expired before the {} operation could complete.", + timer.remainingMs(), + event.task()); + } catch (Exception e) { + firstException.compareAndSet(null, e); + } finally { + timer.update(); + } + } + + private void maybeRevokePartitions(final Timer timer, final AtomicReference firstException) { + if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) + return; + try { + // If the consumer is in a group, we will pause and revoke all assigned partitions + onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + Exception exception = e; + if (e instanceof ExecutionException) + exception = (Exception) e.getCause(); + firstException.compareAndSet(null, exception); + } finally { + subscriptions.assignFromSubscribed(Collections.emptySet()); + timer.update(); + } + } + + private void maybeAutoCommitSync(final Timer timer, final AtomicReference firstException) { + if (autoCommitEnabled) { + Map allConsumed = subscriptions.allConsumed(); + try { + log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); + } catch (TimeoutException e) { + log.debug("Timeout of {}ms expired before the auto commit could complete.", + timer.remainingMs()); + } catch (Exception e) { + // consistent with async auto-commit failures, we do not propagate the exception + log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); + firstException.compareAndSet(null, e); + } + } + } + + private CompletableFuture onLeavePrepare() { + SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) + return CompletableFuture.completedFuture(null); + // TODO: Invoke rebalanceListener via KAFKA-15276 + return CompletableFuture.completedFuture(null); + } + @Override public void wakeup() { wakeupTrigger.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index fea3b21d26552..8d052e47ab17d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -317,10 +317,11 @@ public void resetAutoCommitTimer() { */ @Override public NetworkClientDelegate.PollResult pollOnClose() { - if (!pendingRequests.hasUnsentRequests() || !coordinatorRequestManager.coordinator().isPresent()) + if (!pendingRequests.hasUnsentRequests()) return EMPTY; List requests = pendingRequests.drainOnClose(); + System.out.print("ddraining + " + requests); return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests); } @@ -785,7 +786,7 @@ private void clearAll() { unsentOffsetFetches.clear(); } - private List drainOnClose() { + public List drainOnClose() { ArrayList res = new ArrayList<>(); res.addAll(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList())); clearAll(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index cacc3e398aa9b..f6155e7aed1eb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -16,12 +16,10 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; -import org.apache.kafka.common.Node; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.utils.KafkaThread; @@ -33,11 +31,9 @@ import java.io.Closeable; import java.time.Duration; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -144,7 +140,6 @@ void runOnce() { .map(networkClientDelegate::addAll) .reduce(MAX_POLL_TIMEOUT_MS, Math::min); networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs); - cachedMaximumTimeToWait = requestManagers.entries().stream() .filter(Optional::isPresent) .map(Optional::get) @@ -195,11 +190,11 @@ static void runAtClose(final Collection> requ // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until // all requests have received a response. - do { + while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)) { pollWaitTimeMs = Math.min(pollWaitTimeMs, timer.remainingMs()); networkClientDelegate.poll(pollWaitTimeMs, timer.currentTimeMs()); timer.update(); - } while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)); + } } public boolean isRunning() { @@ -275,79 +270,18 @@ private void closeInternal(final Duration timeout) { } void cleanup() { + log.trace("Closing the consumer network thread"); + Timer timer = time.timer(closeTimeout); try { - log.trace("Closing the consumer network thread"); - Timer timer = time.timer(closeTimeout); - maybeAutocommitOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); - maybeLeaveGroup(timer); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { + networkClientDelegate.awaitPendingRequests(timer); closeQuietly(requestManagers, "request managers"); closeQuietly(networkClientDelegate, "network client delegate"); closeQuietly(applicationEventProcessor, "application event processor"); log.debug("Closed the consumer network thread"); } } - - /** - * We need to autocommit before shutting down the consumer. The method needs to first connect to the coordinator - * node to construct the closing requests. Then wait for all closing requests to finish before returning. The - * method is bounded by a closing timer. We will continue closing down the consumer if the requests cannot be - * completed in time. - */ - // Visible for testing - void maybeAutocommitOnClose(final Timer timer) { - if (!requestManagers.coordinatorRequestManager.isPresent()) - return; - - if (!requestManagers.commitRequestManager.isPresent()) { - log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); - return; - } - - if (!requestManagers.commitRequestManager.get().canAutoCommit()) { - return; - } - - ensureCoordinatorReady(timer); - NetworkClientDelegate.UnsentRequest autocommitRequest = - requestManagers.commitRequestManager.get().createCommitAllConsumedRequest(); - networkClientDelegate.add(autocommitRequest); - do { - long currentTimeMs = timer.currentTimeMs(); - ensureCoordinatorReady(timer); - networkClientDelegate.poll(timer.remainingMs(), currentTimeMs); - } while (timer.notExpired() && !autocommitRequest.future().isDone()); - } - - void maybeLeaveGroup(final Timer timer) { - // TODO: Leave group upon closing the consumer - } - - private void ensureCoordinatorReady(final Timer timer) { - while (!coordinatorReady() && timer.notExpired()) { - findCoordinatorSync(timer); - } - } - - private boolean coordinatorReady() { - CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.orElseThrow( - () -> new IllegalStateException("CoordinatorRequestManager uninitialized.")); - Optional coordinator = coordinatorRequestManager.coordinator(); - return coordinator.isPresent() && !networkClientDelegate.isUnavailable(coordinator.get()); - } - - private void findCoordinatorSync(final Timer timer) { - CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.orElseThrow( - () -> new IllegalStateException("CoordinatorRequestManager uninitialized.")); - NetworkClientDelegate.UnsentRequest request = coordinatorRequestManager.makeFindCoordinatorRequest(timer.currentTimeMs()); - networkClientDelegate.addAll(Collections.singletonList(request)); - CompletableFuture findCoordinatorRequest = request.future(); - while (timer.notExpired() && !findCoordinatorRequest.isDone()) { - networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); - timer.update(); - } - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java index aec25985093a6..810ca61c973d5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java @@ -126,6 +126,11 @@ public interface MembershipManager { */ CompletableFuture leaveGroup(); + /** + * Leaving the group when the user closes the consumer. + */ + void leaveGroupOnClose(); + /** * @return True if the member should send heartbeat to the coordinator without waiting for * the interval. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index 1cdd26977d00a..631ce8b4abc2c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -114,14 +114,14 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource /** * TopicPartition comparator based on topic name and partition id. */ - private final static TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = + final static TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new TopicPartitionComparator(); /** * TopicIdPartition comparator based on topic name and partition id (ignoring ID while sorting, * as this is sorted mainly for logging purposes). */ - private final static TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = + final static TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = new TopicIdPartitionComparator(); /** @@ -596,6 +596,28 @@ public CompletableFuture leaveGroup() { return leaveResult; } + /** + * When closing down the consumer. The partitions are already revoked by the application thread therefore we just + * need to transition the state to LEAVING and set the epoch to -1/-2. + */ + @Override + public void leaveGroupOnClose() { + if (state == MemberState.UNSUBSCRIBED || + state == MemberState.FATAL || + state == MemberState.LEAVING) { + return; + } + + if (state == MemberState.PREPARE_LEAVING) { + transitionToSendingLeaveGroup(); + return; + } + + transitionTo(MemberState.PREPARE_LEAVING); + transitionToSendingLeaveGroup(); + leaveGroupInProgress = Optional.of(CompletableFuture.completedFuture(null)); + } + /** * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or * onPartitionsLost. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 141f5f955c8b5..10e0b345dcb71 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -64,10 +64,10 @@ public class NetworkClientDelegate implements AutoCloseable { private final long retryBackoffMs; public NetworkClientDelegate( - final Time time, - final ConsumerConfig config, - final LogContext logContext, - final KafkaClient client) { + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { this.time = time; this.client = client; this.log = logContext.logger(getClass()); @@ -130,6 +130,21 @@ public void poll(final long timeoutMs, final long currentTimeMs) { checkDisconnects(currentTimeMs); } + /** + * Block until all pending requests from the given node have finished. + */ + public void awaitPendingRequests(Timer timer) { + while (!unsentRequests().isEmpty() && timer.notExpired()) { + poll(timer.remainingMs(), timer.currentTimeMs()); + timer.update(); + } + + if (!unsentRequests.isEmpty()) { + log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", + unsentRequests.size()); + } + } + /** * Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will * find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a @@ -310,11 +325,11 @@ Optional node() { @Override public String toString() { return "UnsentRequest{" + - "requestBuilder=" + requestBuilder + - ", handler=" + handler + - ", node=" + node + - ", timer=" + timer + - '}'; + "requestBuilder=" + requestBuilder + + ", handler=" + handler + + ", node=" + node + + ", timer=" + timer + + '}'; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index bc3c4f8d4a4a4..f68d5d515f05a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -26,7 +26,8 @@ public abstract class ApplicationEvent { public enum Type { COMMIT, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, - UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED + UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, + PREP_CLOSING } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index d7aade6e31a10..7620cfd7463d7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager; import org.apache.kafka.clients.consumer.internals.MembershipManager; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; @@ -45,15 +46,19 @@ public class ApplicationEventProcessor extends EventProcessor private final Logger log; private final ConsumerMetadata metadata; private final RequestManagers requestManagers; + private final NetworkClientDelegate networkClientDelegate; public ApplicationEventProcessor(final LogContext logContext, final BlockingQueue applicationEventQueue, final RequestManagers requestManagers, - final ConsumerMetadata metadata) { + final ConsumerMetadata metadata, + final NetworkClientDelegate ncd) { super(logContext, applicationEventQueue); this.log = logContext.logger(ApplicationEventProcessor.class); this.requestManagers = requestManagers; this.metadata = metadata; + this.networkClientDelegate = ncd; + } /** @@ -116,6 +121,10 @@ public void process(ApplicationEvent event) { process((ConsumerRebalanceListenerCallbackCompletedEvent) event); return; + case PREP_CLOSING: + processPrepClosingEvent((ConsumerCloseApplicationEvent) event); + return; + default: log.warn("Application event type " + event.type() + " was not expected"); } @@ -242,23 +251,54 @@ private void process(final ConsumerRebalanceListenerCallbackCompletedEvent event manager.consumerRebalanceListenerCallbackCompleted(event); } + private void processPrepClosingEvent(ConsumerCloseApplicationEvent event) { + switch (event.task()) { + case COMMIT: + log.debug("Sending unsent commit before closing."); + sendUnsentCommit(); + event.future().complete(null); + break; + case LEAVE_GROUP: + log.debug("Leaving group before closing"); + requestManagers.membershipManager.ifPresent(MembershipManager::leaveGroupOnClose); + event.future().complete(null); + break; + default: + log.warn("Invalid ConsumerCloseApplicationEvent task {}", event.task()); + event.future().completeExceptionally(new KafkaException("Invalid closing task.")); + } + } + + private void sendUnsentCommit() { + if (!requestManagers.commitRequestManager.isPresent()) + return; + NetworkClientDelegate.PollResult res = requestManagers.commitRequestManager.get().pollOnClose(); + if (res.unsentRequests.isEmpty()) + return; + // NetworkThread will continue to poll the networkClientDelegate + networkClientDelegate.addAll(res); + } + /** * Creates a {@link Supplier} for deferred creation during invocation by * {@link ConsumerNetworkThread}. */ public static Supplier supplier(final LogContext logContext, final ConsumerMetadata metadata, + final Supplier ncdSupplier, final BlockingQueue applicationEventQueue, final Supplier requestManagersSupplier) { return new CachedSupplier() { @Override protected ApplicationEventProcessor create() { RequestManagers requestManagers = requestManagersSupplier.get(); + NetworkClientDelegate ncd = ncdSupplier.get(); return new ApplicationEventProcessor( logContext, applicationEventQueue, requestManagers, - metadata + metadata, + ncd ); } }; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java new file mode 100644 index 0000000000000..7fff3dfb8a0ff --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class ConsumerCloseApplicationEvent extends CompletableApplicationEvent { + public enum Task { + COMMIT, LEAVE_GROUP + } + + private final Task task; + private final long timeout; + + public ConsumerCloseApplicationEvent(final Task task, final long timeout) { + super(ApplicationEvent.Type.PREP_CLOSING); + this.task = task; + this.timeout = timeout; + } + + public Task task() { + return task; + } + + public long timeout() { + return timeout; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ConsumerCloseApplicationEvent that = (ConsumerCloseApplicationEvent) o; + + return task == that.task; + } + + @Override + public int hashCode() { + return task.hashCode(); + } + + protected String toStringBase() { + return "task=" + task; + } + + @Override + public String toString() { + return "ConsumerCloseApplicationEvent{" + + toStringBase() + + '}'; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java new file mode 100644 index 0000000000000..b53250f438d2c --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.ConsumerCloseApplicationEvent; +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class ApplicationEventProcessorTest { + private ApplicationEventProcessor processor; + private BlockingQueue applicationEventQueue = mock(BlockingQueue.class); + private RequestManagers requestManagers; + private ConsumerMetadata metadata = mock(ConsumerMetadata.class); + private NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); + private OffsetsRequestManager offsetRequestManager; + private OffsetsRequestManager offsetsRequestManager; + private TopicMetadataRequestManager topicMetadataRequestManager; + private FetchRequestManager fetchRequestManager; + private CoordinatorRequestManager coordinatorRequestManager; + private CommitRequestManager commitRequestManager; + private HeartbeatRequestManager heartbeatRequestManager; + private MembershipManager membershipManager; + + @BeforeEach + @SuppressWarnings("unchecked") + public void setup() { + LogContext logContext = new LogContext(); + offsetRequestManager = mock(OffsetsRequestManager.class); + offsetsRequestManager = mock(OffsetsRequestManager.class); + topicMetadataRequestManager = mock(TopicMetadataRequestManager.class); + fetchRequestManager = mock(FetchRequestManager.class); + coordinatorRequestManager = mock(CoordinatorRequestManager.class); + commitRequestManager = mock(CommitRequestManager.class); + heartbeatRequestManager = mock(HeartbeatRequestManager.class); + membershipManager = mock(MembershipManager.class); + requestManagers = new RequestManagers( + logContext, + offsetsRequestManager, + topicMetadataRequestManager, + fetchRequestManager, + Optional.of(coordinatorRequestManager), + Optional.of(commitRequestManager), + Optional.of(heartbeatRequestManager), + Optional.of(membershipManager)); + processor = new ApplicationEventProcessor( + new LogContext(), + applicationEventQueue, + requestManagers, + metadata, + networkClientDelegate); + } + + @Test + public void testPrepClosingCommitEvents() { + List results = mockCommitResults(); + doReturn(new NetworkClientDelegate.PollResult(100, results)).when(commitRequestManager).pollOnClose(); + processor.process(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, 0)); + verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); + } + + @Test + public void testPrepClosingLeaveGroupEvent() { + processor.process(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, 0)); + verify(membershipManager).leaveGroupOnClose(); + } + + private List mockCommitResults() { + return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 19734698afba9..db52b15339535 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.ConsumerCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; @@ -112,6 +113,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -340,7 +342,6 @@ public void testWakeupAfterEmptyFetch() { assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } - @Test public void testWakeupAfterNonEmptyFetch() { consumer = newConsumer(); final String topicName = "foo"; @@ -456,6 +457,8 @@ public void testCommitAsyncLeaderEpochUpdate() { topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), "")); consumer.assign(Arrays.asList(t0, t1)); + consumer.seek(t0, 10); + consumer.seek(t1, 20); assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, callback)); @@ -490,6 +493,15 @@ public void testEnsureShutdownExecutedCommitAsyncCallbacks() { null); } + @Test + public void testVerifyApplicationEventOnShutdown() { + consumer = newConsumer(); + doReturn(null).when(applicationEventHandler).addAndGet(any(), any()); + consumer.close(); + verify(applicationEventHandler, times(2)).addAndGet(any(ConsumerCloseApplicationEvent.class), any()); + verify(applicationEventHandler).add(any(CommitApplicationEvent.class)); + } + private void assertMockCommitCallbackInvoked(final Executable task, final MockCommitCallback callback, final Errors errors) { @@ -657,7 +669,7 @@ public void testOffsetsForTimesWithZeroTimeout() { Map result = assertDoesNotThrow(() -> consumer.offsetsForTimes(timestampToSearch, - Duration.ofMillis(0))); + Duration.ZERO)); assertEquals(expectedResult, result); verify(applicationEventHandler, never()).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); @@ -1005,7 +1017,8 @@ public void testGroupIdNull() { @Test public void testGroupIdNotNullAndValid() { - final Properties props = requiredConsumerPropertiesAndGroupId("consumerGroupA"); + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); final ConsumerConfig config = new ConsumerConfig(props); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 74d69d8d42aeb..d5a6f5002afe5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -458,6 +459,21 @@ public void testOffsetFetchRequest_PartitionDataError(final Errors error, final testNonRetriable(Collections.singletonList(future)); } + @Test + public void testPollOnClose() { + CommitRequestManager commitRequestManger = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + + Map offsets = Collections.singletonMap(new TopicPartition("topic", 1), + new OffsetAndMetadata(0)); + + commitRequestManger.addOffsetCommitRequest(offsets); + NetworkClientDelegate.PollResult res = commitRequestManger.pollOnClose(); + assertEquals(1, res.unsentRequests.size()); + OffsetCommitRequestData data = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data(); + assertEquals("topic", data.topics().get(0).name()); + } + private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManger) { assertTrue(commitRequestManger.pendingRequests.inflightOffsetFetches.isEmpty()); assertTrue(commitRequestManger.pendingRequests.unsentOffsetFetches.isEmpty()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 0eefb59604f3a..d7db239f3d0e2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -55,8 +55,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import static java.util.Collections.singleton; -import static java.util.Collections.singletonMap; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; @@ -85,6 +83,8 @@ public class ConsumerNetworkThreadTest { private OffsetsRequestManager offsetsRequestManager; private CommitRequestManager commitRequestManager; private CoordinatorRequestManager coordinatorRequestManager; + private HeartbeatRequestManager heartbeatRequestManager; + private MembershipManager memberhipsManager; private ConsumerNetworkThread consumerNetworkThread; private MockClient client; private SubscriptionState subscriptions; @@ -101,6 +101,8 @@ public void setup() { commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); offsetsRequestManager = testBuilder.offsetsRequestManager; coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); + heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); + memberhipsManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); consumerNetworkThread = testBuilder.consumerNetworkThread; subscriptions = testBuilder.subscriptions; consumerNetworkThread.initializeResources(); @@ -285,38 +287,6 @@ void testEnsureEventsAreCompleted() { assertTrue(applicationEventsQueue.isEmpty()); } - @Test - void testCoordinatorConnectionOnClose() { - TopicPartition tp = new TopicPartition("topic", 0); - subscriptions.assignFromUser(singleton(new TopicPartition("topic", 0))); - subscriptions.seekUnvalidated(tp, new SubscriptionState.FetchPosition(100)); - Node node = metadata.fetch().nodes().get(0); - coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(singletonMap(tp, 100L), Errors.NONE, false); - consumerNetworkThread.cleanup(); - assertTrue(coordinatorRequestManager.coordinator().isPresent()); - assertFalse(client.hasPendingResponses()); - assertFalse(client.hasInFlightRequests()); - } - - @Test - void testAutoCommitOnClose() { - TopicPartition tp = new TopicPartition("topic", 0); - Node node = metadata.fetch().nodes().get(0); - subscriptions.assignFromUser(singleton(tp)); - subscriptions.seek(tp, 100); - coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(singletonMap(tp, 100L), Errors.NONE, false); - consumerNetworkThread.maybeAutocommitOnClose(time.timer(1000)); - assertTrue(coordinatorRequestManager.coordinator().isPresent()); - verify(commitRequestManager).createCommitAllConsumedRequest(); - - assertFalse(client.hasPendingResponses()); - assertFalse(client.hasInFlightRequests()); - } - private void prepareTearDown() { Node node = metadata.fetch().nodes().get(0); client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index f0d42a8ed3b2c..45c530eba927b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -260,7 +260,8 @@ public ConsumerTestBuilder(Optional groupInfo, boolean enableA logContext, applicationEventQueue, requestManagers, - metadata) + metadata, + networkClientDelegate) ); ConsumerCoordinatorMetrics consumerCoordinatorMetrics = new ConsumerCoordinatorMetrics( subscriptions, From 3948dc7f4c9b4d5b9e4145c3f3830ed993ceeb0a Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Wed, 6 Dec 2023 15:54:47 -0800 Subject: [PATCH 02/14] clean up based on comments more clean up clean up clean up fix broken tests clean up refactor based on PR comment clean up --- .../internals/AsyncKafkaConsumer.java | 98 +++++++++---------- .../internals/CommitRequestManager.java | 21 ++-- .../internals/ConsumerNetworkThread.java | 33 +++---- .../internals/FetchRequestManager.java | 15 +-- .../consumer/internals/MembershipManager.java | 5 - .../internals/MembershipManagerImpl.java | 35 ++----- .../internals/NetworkClientDelegate.java | 25 +---- .../consumer/internals/RequestManager.java | 7 ++ .../internals/events/ApplicationEvent.java | 2 +- .../events/ApplicationEventProcessor.java | 55 ++++------- .../events/CommitOnCloseApplicationEvent.java | 31 ++++++ .../events/ConsumerCloseApplicationEvent.java | 66 ------------- .../events/LeaveOnCloseApplicationEvent.java | 30 ++++++ .../org/apache/kafka/common/utils/Utils.java | 31 +++--- .../ApplicationEventProcessorTest.java | 22 +++-- .../internals/AsyncKafkaConsumerTest.java | 39 +++++++- .../internals/CommitRequestManagerTest.java | 5 +- .../internals/ConsumerTestBuilder.java | 4 +- .../internals/FetchRequestManagerTest.java | 3 +- .../internals/MembershipManagerImplTest.java | 23 +++++ 20 files changed, 281 insertions(+), 269 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index a6a999f12d024..469a5a038b718 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -45,7 +45,6 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.ConsumerCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent; @@ -84,6 +83,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.event.Level; @@ -106,7 +106,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -379,8 +378,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { clientTelemetryReporter); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, - networkClientDelegateSupplier, - applicationEventQueue, + applicationEventQueue, requestManagersSupplier); this.applicationEventHandler = applicationEventHandlerFactory.build( logContext, @@ -572,8 +570,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, metadata, - networkClientDelegateSupplier, - applicationEventQueue, + applicationEventQueue, requestManagersSupplier ); this.applicationEventHandler = new ApplicationEventHandler(logContext, @@ -1222,7 +1219,6 @@ private void close(Duration timeout, boolean swallowException) { closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); closeTimer.update(); // Ensure all async commit callbacks are invoked - swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback", this::maybeInvokeCommitCallbacks, firstException); closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); @@ -1245,68 +1241,62 @@ private void close(Duration timeout, boolean swallowException) { * 1. autocommit offsets * 2. revoke all partitions */ - private void prepareShutdown(final Timer timer, final AtomicReference firstException) { + void prepareShutdown(final Timer timer, final AtomicReference firstException) { if (!groupMetadata.isPresent()) return; - maybeAutoCommitSync(timer, firstException); + maybeAutoCommitSync(autoCommitEnabled, timer, firstException); timer.update(); - waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, timer.remainingMs()), timer, firstException); - maybeInvokeCommitCallbacks(); - maybeRevokePartitions(timer, firstException); - waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, timer.remainingMs()), timer, firstException); + applicationEventHandler.add(new CommitOnCloseApplicationEvent()); + maybeRevokePartitions(firstException); + completeSilently( + () -> { + applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer); + timer.update(); + }, + "Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); + swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", this::maybeInvokeCommitCallbacks, + firstException); } - private void waitOnEventCompletion(final ConsumerCloseApplicationEvent event, - final Timer timer, - final AtomicReference firstException) { - try { - applicationEventHandler.addAndGet(event, timer); - } catch (TimeoutException e) { - log.debug("Timeout of {}ms expired before the {} operation could complete.", - timer.remainingMs(), - event.task()); - } catch (Exception e) { - firstException.compareAndSet(null, e); - } finally { + // Visible for testing + void maybeAutoCommitSync(final boolean shouldAutoCommit, + final Timer timer, + final AtomicReference firstException) { + if (!shouldAutoCommit) + return; + completeSilently(() -> { + Map allConsumed = subscriptions.allConsumed(); + log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); timer.update(); - } + }, "Failed autoCommitSync with a timeout(ms)=" + timer.timeoutMs(), firstException); } - private void maybeRevokePartitions(final Timer timer, final AtomicReference firstException) { + // Visible for testing + void maybeRevokePartitions(final AtomicReference firstException) { if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) return; - try { - // If the consumer is in a group, we will pause and revoke all assigned partitions - onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS); - } catch (Exception e) { - Exception exception = e; - if (e instanceof ExecutionException) - exception = (Exception) e.getCause(); - firstException.compareAndSet(null, exception); - } finally { - subscriptions.assignFromSubscribed(Collections.emptySet()); - timer.update(); - } + CompletableFuture revocationFuture = invokePartitionRevocationListener(); + completeSilently(revocationFuture::get, + "Failed revoking partitions of " + subscriptions.assignedPartitions(), + firstException); + subscriptions.assignFromSubscribed(Collections.emptySet()); } - private void maybeAutoCommitSync(final Timer timer, final AtomicReference firstException) { - if (autoCommitEnabled) { - Map allConsumed = subscriptions.allConsumed(); - try { - log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); - commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); - } catch (TimeoutException e) { - log.debug("Timeout of {}ms expired before the auto commit could complete.", - timer.remainingMs()); - } catch (Exception e) { - // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); - firstException.compareAndSet(null, e); - } + // Visible for testing + void completeSilently(final Utils.ThrowingRunnable function, + final String msg, + final AtomicReference firstException) { + try { + function.run(); + } catch (TimeoutException e) { + log.debug("Timeout expired before the {} operation could complete.", msg); + } catch (Exception e) { + firstException.compareAndSet(null, e); } } - private CompletableFuture onLeavePrepare() { + private CompletableFuture invokePartitionRevocationListener() { SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); droppedPartitions.addAll(subscriptions.assignedPartitions()); if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 8d052e47ab17d..6316ead793400 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -80,6 +80,7 @@ public class CommitRequestManager implements RequestManager { private final OptionalDouble jitter; private final boolean throwOnFetchStableOffsetUnsupported; final PendingRequests pendingRequests; + private boolean closing = false; public CommitRequestManager( final Time time, @@ -153,6 +154,10 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent()) return EMPTY; + if (closing) { + return drainPendingOffsetCommitRequests(); + } + maybeAutoCommitAllConsumed(); if (!pendingRequests.hasUnsentRequests()) return EMPTY; @@ -165,6 +170,11 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return new NetworkClientDelegate.PollResult(timeUntilNextPoll, requests); } + @Override + public void signalClose() { + closing = true; + } + /** * Returns the delay for which the application thread can safely wait before it should be responsive * to results from the request managers. For example, the subscription state can change when heartbeats @@ -315,13 +325,10 @@ public void resetAutoCommitTimer() { * Drains the inflight offsetCommits during shutdown because we want to make sure all pending commits are sent * before closing. */ - @Override - public NetworkClientDelegate.PollResult pollOnClose() { - if (!pendingRequests.hasUnsentRequests()) + public NetworkClientDelegate.PollResult drainPendingOffsetCommitRequests() { + if (pendingRequests.unsentOffsetCommits.isEmpty()) return EMPTY; - - List requests = pendingRequests.drainOnClose(); - System.out.print("ddraining + " + requests); + List requests = pendingRequests.drainPendingCommits(); return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests); } @@ -786,7 +793,7 @@ private void clearAll() { unsentOffsetFetches.clear(); } - public List drainOnClose() { + private List drainPendingCommits() { ArrayList res = new ArrayList<>(); res.addAll(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList())); clearAll(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index f6155e7aed1eb..a49343f4dbefd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -34,9 +34,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.Future; import java.util.function.Supplier; -import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -174,27 +172,11 @@ static void runAtClose(final Collection> requ final NetworkClientDelegate networkClientDelegate, final Timer timer) { // These are the optional outgoing requests at the - List pollResults = requestManagers.stream() + requestManagers.stream() .filter(Optional::isPresent) .map(Optional::get) .map(RequestManager::pollOnClose) - .collect(Collectors.toList()); - long pollWaitTimeMs = pollResults.stream() - .map(networkClientDelegate::addAll) - .reduce(MAX_POLL_TIMEOUT_MS, Math::min); - - List> requestFutures = pollResults.stream() - .flatMap(fads -> fads.unsentRequests.stream()) - .map(NetworkClientDelegate.UnsentRequest::future) - .collect(Collectors.toList()); - - // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until - // all requests have received a response. - while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)) { - pollWaitTimeMs = Math.min(pollWaitTimeMs, timer.remainingMs()); - networkClientDelegate.poll(pollWaitTimeMs, timer.currentTimeMs()); - timer.update(); - } + .forEach(networkClientDelegate::addAll); } public boolean isRunning() { @@ -269,6 +251,15 @@ private void closeInternal(final Duration timeout) { } } + private void sendUnsentRequests(final Timer timer) { + // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until + // all requests have received a response. + while (!networkClientDelegate.unsentRequests().isEmpty() && timer.notExpired()) { + networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); + timer.update(); + } + } + void cleanup() { log.trace("Closing the consumer network thread"); Timer timer = time.timer(closeTimeout); @@ -277,7 +268,7 @@ void cleanup() { } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { - networkClientDelegate.awaitPendingRequests(timer); + sendUnsentRequests(timer); closeQuietly(requestManagers, "request managers"); closeQuietly(networkClientDelegate, "network client delegate"); closeQuietly(applicationEventProcessor, "application event processor"); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java index d99fa77f0f9e7..bef9f2c9cad5d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java @@ -16,13 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import java.util.Optional; - import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; @@ -33,6 +26,13 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + /** * {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition @@ -82,6 +82,7 @@ public PollResult poll(long currentTimeMs) { */ @Override public PollResult pollOnClose() { + // TODO: move the logic to poll to handle signal close return pollInternal( prepareCloseFetchSessionRequests(), this::handleCloseFetchSessionSuccess, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java index 810ca61c973d5..aec25985093a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java @@ -126,11 +126,6 @@ public interface MembershipManager { */ CompletableFuture leaveGroup(); - /** - * Leaving the group when the user closes the consumer. - */ - void leaveGroupOnClose(); - /** * @return True if the member should send heartbeat to the coordinator without waiting for * the interval. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index 631ce8b4abc2c..62fe07fd32cfb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -596,28 +596,6 @@ public CompletableFuture leaveGroup() { return leaveResult; } - /** - * When closing down the consumer. The partitions are already revoked by the application thread therefore we just - * need to transition the state to LEAVING and set the epoch to -1/-2. - */ - @Override - public void leaveGroupOnClose() { - if (state == MemberState.UNSUBSCRIBED || - state == MemberState.FATAL || - state == MemberState.LEAVING) { - return; - } - - if (state == MemberState.PREPARE_LEAVING) { - transitionToSendingLeaveGroup(); - return; - } - - transitionTo(MemberState.PREPARE_LEAVING); - transitionToSendingLeaveGroup(); - leaveGroupInProgress = Optional.of(CompletableFuture.completedFuture(null)); - } - /** * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or * onPartitionsLost. @@ -657,8 +635,9 @@ private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignme * Reset member epoch to the value required for the leave the group heartbeat request, and * transition to the {@link MemberState#LEAVING} state so that a heartbeat * request is sent out with it. + * Visible for testing. */ - private void transitionToSendingLeaveGroup() { + void transitionToSendingLeaveGroup() { if (state == MemberState.FATAL) { log.warn("Member {} with epoch {} won't send leave group request because it is in " + "FATAL state", memberId, memberEpoch); @@ -1015,8 +994,9 @@ private void addToAssignmentReadyToReconcile(Uuid topicId, String topicName, Sor * * @param revokedPartitions Partitions to revoke. * @return Future that will complete when the commit request and user callback completes. + * Visible for testing */ - private CompletableFuture revokePartitions(Set revokedPartitions) { + CompletableFuture revokePartitions(Set revokedPartitions) { log.info("Revoking previously assigned partitions {}", Utils.join(revokedPartitions, ", ")); logPausedPartitionsBeingRevoked(revokedPartitions); @@ -1043,7 +1023,7 @@ private CompletableFuture revokePartitions(Set revokedPart // retriable errors, so at this point we assume this is non-retriable, but // proceed with the revocation anyway). log.error("Commit request before revocation failed with non-retriable error. Will" + - " proceed with the revocation anyway.", error); + " proceed with the revocation anyway.", error); } // At this point we expect to be in a middle of a revocation triggered from RECONCILING @@ -1063,7 +1043,7 @@ private CompletableFuture revokePartitions(Set revokedPart userCallbackResult.whenComplete((callbackResult, callbackError) -> { if (callbackError != null) { log.error("onPartitionsRevoked callback invocation failed for partitions {}", - revokedPartitions, callbackError); + revokedPartitions, callbackError); revocationResult.completeExceptionally(callbackError); } else { revocationResult.complete(null); @@ -1146,7 +1126,8 @@ private CompletableFuture invokeOnPartitionsAssignedCallback(Set invokeOnPartitionsLostCallback(Set partitionsLost) { + // Visible for testing + CompletableFuture invokeOnPartitionsLostCallback(Set partitionsLost) { // This should not trigger the callback if partitionsLost is empty, to keep the current // behaviour. Optional listener = subscriptions.rebalanceListener(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 10e0b345dcb71..4efbac8ccd856 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -130,21 +130,6 @@ public void poll(final long timeoutMs, final long currentTimeMs) { checkDisconnects(currentTimeMs); } - /** - * Block until all pending requests from the given node have finished. - */ - public void awaitPendingRequests(Timer timer) { - while (!unsentRequests().isEmpty() && timer.notExpired()) { - poll(timer.remainingMs(), timer.currentTimeMs()); - timer.update(); - } - - if (!unsentRequests.isEmpty()) { - log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", - unsentRequests.size()); - } - } - /** * Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will * find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a @@ -325,11 +310,11 @@ Optional node() { @Override public String toString() { return "UnsentRequest{" + - "requestBuilder=" + requestBuilder + - ", handler=" + handler + - ", node=" + node + - ", timer=" + timer + - '}'; + "requestBuilder=" + requestBuilder + + ", handler=" + handler + + ", node=" + node + + ", timer=" + timer + + '}'; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java index 0cda89aedfb76..e37032835d69b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java @@ -77,4 +77,11 @@ default PollResult pollOnClose() { default long maximumTimeToWait(long currentTimeMs) { return Long.MAX_VALUE; } + + /** + * Signals the request manager that the consumer is closing to prepare for the proper actions to be taken. + */ + default void signalClose() { + return; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index f68d5d515f05a..4396df2785368 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -27,7 +27,7 @@ public enum Type { COMMIT, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, - PREP_CLOSING + COMMIT_ON_CLOSE, LEAVE_ON_CLOSE } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 7620cfd7463d7..8ee9ee9420eb8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager; import org.apache.kafka.clients.consumer.internals.MembershipManager; -import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; @@ -46,19 +45,15 @@ public class ApplicationEventProcessor extends EventProcessor private final Logger log; private final ConsumerMetadata metadata; private final RequestManagers requestManagers; - private final NetworkClientDelegate networkClientDelegate; public ApplicationEventProcessor(final LogContext logContext, final BlockingQueue applicationEventQueue, final RequestManagers requestManagers, - final ConsumerMetadata metadata, - final NetworkClientDelegate ncd) { + final ConsumerMetadata metadata) { super(logContext, applicationEventQueue); this.log = logContext.logger(ApplicationEventProcessor.class); this.requestManagers = requestManagers; this.metadata = metadata; - this.networkClientDelegate = ncd; - } /** @@ -121,8 +116,12 @@ public void process(ApplicationEvent event) { process((ConsumerRebalanceListenerCallbackCompletedEvent) event); return; - case PREP_CLOSING: - processPrepClosingEvent((ConsumerCloseApplicationEvent) event); + case COMMIT_ON_CLOSE: + process((CommitOnCloseApplicationEvent) event); + return; + + case LEAVE_ON_CLOSE: + process((LeaveOnCloseApplicationEvent) event); return; default: @@ -251,32 +250,21 @@ private void process(final ConsumerRebalanceListenerCallbackCompletedEvent event manager.consumerRebalanceListenerCallbackCompleted(event); } - private void processPrepClosingEvent(ConsumerCloseApplicationEvent event) { - switch (event.task()) { - case COMMIT: - log.debug("Sending unsent commit before closing."); - sendUnsentCommit(); - event.future().complete(null); - break; - case LEAVE_GROUP: - log.debug("Leaving group before closing"); - requestManagers.membershipManager.ifPresent(MembershipManager::leaveGroupOnClose); - event.future().complete(null); - break; - default: - log.warn("Invalid ConsumerCloseApplicationEvent task {}", event.task()); - event.future().completeExceptionally(new KafkaException("Invalid closing task.")); - } - } - - private void sendUnsentCommit() { + private void process(final CommitOnCloseApplicationEvent event) { if (!requestManagers.commitRequestManager.isPresent()) return; - NetworkClientDelegate.PollResult res = requestManagers.commitRequestManager.get().pollOnClose(); - if (res.unsentRequests.isEmpty()) + log.debug("Signal CommitRequestManager closing"); + requestManagers.commitRequestManager.get().signalClose(); } + + private void process(final LeaveOnCloseApplicationEvent event) { + if (!requestManagers.membershipManager.isPresent()) { + event.future().complete(null); return; - // NetworkThread will continue to poll the networkClientDelegate - networkClientDelegate.addAll(res); + } + log.debug("Leaving group before closing"); + CompletableFuture future = requestManagers.membershipManager.get().leaveGroup(); + // The future will be completed on heartbeat sent + event.chain(future); } /** @@ -285,20 +273,17 @@ private void sendUnsentCommit() { */ public static Supplier supplier(final LogContext logContext, final ConsumerMetadata metadata, - final Supplier ncdSupplier, final BlockingQueue applicationEventQueue, final Supplier requestManagersSupplier) { return new CachedSupplier() { @Override protected ApplicationEventProcessor create() { RequestManagers requestManagers = requestManagersSupplier.get(); - NetworkClientDelegate ncd = ncdSupplier.get(); return new ApplicationEventProcessor( logContext, applicationEventQueue, requestManagers, - metadata, - ncd + metadata ); } }; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java new file mode 100644 index 0000000000000..4cc07e945f9d2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class CommitOnCloseApplicationEvent extends ApplicationEvent { + + public CommitOnCloseApplicationEvent() { + super(Type.COMMIT_ON_CLOSE); + } + + @Override + public String toString() { + return "CommitOnCloseApplicationEvent{" + + toStringBase() + + '}'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java deleted file mode 100644 index 7fff3dfb8a0ff..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerCloseApplicationEvent.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals.events; - -public class ConsumerCloseApplicationEvent extends CompletableApplicationEvent { - public enum Task { - COMMIT, LEAVE_GROUP - } - - private final Task task; - private final long timeout; - - public ConsumerCloseApplicationEvent(final Task task, final long timeout) { - super(ApplicationEvent.Type.PREP_CLOSING); - this.task = task; - this.timeout = timeout; - } - - public Task task() { - return task; - } - - public long timeout() { - return timeout; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ConsumerCloseApplicationEvent that = (ConsumerCloseApplicationEvent) o; - - return task == that.task; - } - - @Override - public int hashCode() { - return task.hashCode(); - } - - protected String toStringBase() { - return "task=" + task; - } - - @Override - public String toString() { - return "ConsumerCloseApplicationEvent{" + - toStringBase() + - '}'; - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java new file mode 100644 index 0000000000000..ee0b6ffa61c7d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class LeaveOnCloseApplicationEvent extends CompletableApplicationEvent { + public LeaveOnCloseApplicationEvent() { + super(Type.LEAVE_ON_CLOSE); + } + + @Override + public String toString() { + return "LeaveOnCloseApplicationEvent{" + + toStringBase() + + '}'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 924b9792fa826..c316b7a181662 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -16,15 +16,6 @@ */ package org.apache.kafka.common.utils; -import java.lang.reflect.Modifier; -import java.nio.BufferUnderflowException; -import java.nio.ByteOrder; -import java.nio.file.StandardOpenOption; -import java.util.AbstractMap; -import java.util.EnumSet; -import java.util.Map.Entry; -import java.util.SortedSet; -import java.util.TreeSet; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.network.TransferableChannel; @@ -42,7 +33,10 @@ import java.io.StringWriter; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Modifier; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; @@ -52,16 +46,22 @@ import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -69,9 +69,12 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -84,9 +87,6 @@ import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; public final class Utils { @@ -1675,4 +1675,11 @@ public static Map entriesWithPrefix(Map map, String pr return result; } + /** + * A runnable that can throw checked exception. + */ + @FunctionalInterface + public interface ThrowingRunnable { + void run() throws Exception; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java index b53250f438d2c..45641bbea64d8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java @@ -17,7 +17,8 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; -import org.apache.kafka.clients.consumer.internals.events.ConsumerCloseApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent; import org.apache.kafka.common.utils.LogContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -26,11 +27,13 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; -import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ApplicationEventProcessorTest { private ApplicationEventProcessor processor; @@ -72,22 +75,25 @@ public void setup() { new LogContext(), applicationEventQueue, requestManagers, - metadata, - networkClientDelegate); + metadata + ); } @Test public void testPrepClosingCommitEvents() { List results = mockCommitResults(); doReturn(new NetworkClientDelegate.PollResult(100, results)).when(commitRequestManager).pollOnClose(); - processor.process(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, 0)); - verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class)); + processor.process(new CommitOnCloseApplicationEvent()); + verify(commitRequestManager).signalClose(); } @Test public void testPrepClosingLeaveGroupEvent() { - processor.process(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, 0)); - verify(membershipManager).leaveGroupOnClose(); + LeaveOnCloseApplicationEvent event = new LeaveOnCloseApplicationEvent(); + when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null)); + processor.process(event); + verify(membershipManager).leaveGroup(); + assertTrue(event.future().isDone()); } private List mockCommitResults() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index db52b15339535..fe7a1071821c0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -33,7 +33,6 @@ import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; -import org.apache.kafka.clients.consumer.internals.events.ConsumerCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; @@ -86,6 +85,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -502,6 +502,43 @@ public void testVerifyApplicationEventOnShutdown() { verify(applicationEventHandler).add(any(CommitApplicationEvent.class)); } + @Test + public void testEnsureSubscribedPartitionsRevokedOnClosed() { + subscriptions.subscribe(singleton("topic"), Optional.empty()); + TopicPartition tp = new TopicPartition("topic", 0); + subscriptions.assignFromSubscribed(singleton(tp)); + consumer.close(Duration.ZERO); + assertTrue(subscriptions.assignedPartitions().isEmpty()); + verify(consumer).maybeRevokePartitions(any()); + } + + @Test + public void testWaitOnCompletionDoesNotThrow() { + AtomicReference exception = new AtomicReference<>(); + CompletableFuture future = CompletableFuture.completedFuture(null); + assertDoesNotThrow(() -> consumer.completeSilently(() -> { + future.get(0, TimeUnit.MILLISECONDS); + }, "test", exception)); + assertNull(exception.get()); + + assertDoesNotThrow(() -> consumer.completeSilently(() -> { + throw new KafkaException("Test exception"); + }, "test", exception)); + assertTrue(exception.get() instanceof KafkaException); + } + + @Test + public void testEnsureAutocommitSent() { + consumer.maybeAutoCommitSync(true, testBuilder.time.timer(100), null); + verify(consumer).completeSilently(any(), any(), any()); + } + + @Test + public void testEnsureautocommitNotSent() { + consumer.maybeAutoCommitSync(false, testBuilder.time.timer(100), null); + verify(consumer, never()).completeSilently(any(), any(), any()); + } + private void assertMockCommitCallbackInvoked(final Executable task, final MockCommitCallback callback, final Errors errors) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index d5a6f5002afe5..f8cecba8f99c7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -460,7 +460,7 @@ public void testOffsetFetchRequest_PartitionDataError(final Errors error, final } @Test - public void testPollOnClose() { + public void testSignalClose() { CommitRequestManager commitRequestManger = create(true, 100); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); @@ -468,7 +468,8 @@ public void testPollOnClose() { new OffsetAndMetadata(0)); commitRequestManger.addOffsetCommitRequest(offsets); - NetworkClientDelegate.PollResult res = commitRequestManger.pollOnClose(); + commitRequestManger.signalClose(); + NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); assertEquals(1, res.unsentRequests.size()); OffsetCommitRequestData data = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data(); assertEquals("topic", data.topics().get(0).name()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 45c530eba927b..2319f75a57eee 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -260,8 +260,8 @@ public ConsumerTestBuilder(Optional groupInfo, boolean enableA logContext, applicationEventQueue, requestManagers, - metadata, - networkClientDelegate) + metadata + ) ); ConsumerCoordinatorMetrics consumerCoordinatorMetrics = new ConsumerCoordinatorMetrics( subscriptions, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index e080f63c1a81e..c3da9157acfa3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -315,7 +315,8 @@ public void testFetcherCloseClosesFetchSessionsInBroker() { // the close() method with a Timer will NOT send out the close session requests on close. The network // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we need to run that logic here. ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, timer); - + // the network is polled during the last state of clean up. + networkClientDelegate.poll(time.timer(1)); // validate that closing the fetcher has sent a request with final epoch. 2 requests are sent, one for the // normal fetch earlier and another for the finish fetch here. verify(networkClientDelegate, times(2)).doSend(argument.capture(), any(Long.class)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index 88058d1d54132..a098f6be36336 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; @@ -66,6 +67,7 @@ import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -1402,6 +1404,27 @@ public void testHeartbeatSentOnStaledMember() { membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.JOINING, membershipManager.state()); } + public void testLeaveGroupOnShutdown() { + ConsumerRebalanceListener cb = mock(ConsumerRebalanceListener.class); + MembershipManagerImpl membershipManager = createMemberInStableState(); + subscriptionState.subscribe(Collections.singleton("topic1"), Optional.of(cb)); + subscriptionState.assignFromSubscribed(Collections.singleton(new TopicPartition("topic1", 0))); + dropAssignedPartitions(); + assertTrue(subscriptionState.assignedPartitions().isEmpty()); + CompletableFuture leaveGroupFuture = membershipManager.leaveGroup(); + membershipManager.onHeartbeatRequestSent(); + assertTrue(leaveGroupFuture.isDone()); + verify(membershipManager, never()).revokePartitions(anySet()); + verify(membershipManager, never()).invokeOnPartitionsLostCallback(anySet()); + verify(membershipManager).transitionToSendingLeaveGroup(); + + } + + private void dropAssignedPartitions() { + SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptionState.assignedPartitions()); + subscriptionState.assignFromSubscribed(Collections.emptySet()); + } private MembershipManagerImpl mockMemberSuccessfullyReceivesAndAcksAssignment( Uuid topicId, String topicName, List partitions) { From e551bcc696edb9de76175468bfdaa1213ab6422d Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Thu, 7 Dec 2023 15:11:19 -0800 Subject: [PATCH 03/14] clean up Update AsyncKafkaConsumer.java --- .../consumer/internals/AsyncKafkaConsumer.java | 14 +++++++------- .../consumer/internals/AsyncKafkaConsumerTest.java | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 469a5a038b718..3bf2071bc6620 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1245,10 +1245,10 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx if (!groupMetadata.isPresent()) return; maybeAutoCommitSync(autoCommitEnabled, timer, firstException); - timer.update(); applicationEventHandler.add(new CommitOnCloseApplicationEvent()); maybeRevokePartitions(firstException); - completeSilently( + timer.update(); + completeQuietly( () -> { applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer); timer.update(); @@ -1264,7 +1264,7 @@ void maybeAutoCommitSync(final boolean shouldAutoCommit, final AtomicReference firstException) { if (!shouldAutoCommit) return; - completeSilently(() -> { + completeQuietly(() -> { Map allConsumed = subscriptions.allConsumed(); log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); @@ -1277,16 +1277,16 @@ void maybeRevokePartitions(final AtomicReference firstException) { if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) return; CompletableFuture revocationFuture = invokePartitionRevocationListener(); - completeSilently(revocationFuture::get, + completeQuietly(revocationFuture::get, "Failed revoking partitions of " + subscriptions.assignedPartitions(), firstException); subscriptions.assignFromSubscribed(Collections.emptySet()); } // Visible for testing - void completeSilently(final Utils.ThrowingRunnable function, - final String msg, - final AtomicReference firstException) { + void completeQuietly(final Utils.ThrowingRunnable function, + final String msg, + final AtomicReference firstException) { try { function.run(); } catch (TimeoutException e) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index fe7a1071821c0..aff7ac65044ae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -516,12 +516,12 @@ public void testEnsureSubscribedPartitionsRevokedOnClosed() { public void testWaitOnCompletionDoesNotThrow() { AtomicReference exception = new AtomicReference<>(); CompletableFuture future = CompletableFuture.completedFuture(null); - assertDoesNotThrow(() -> consumer.completeSilently(() -> { + assertDoesNotThrow(() -> consumer.completeQuietly(() -> { future.get(0, TimeUnit.MILLISECONDS); }, "test", exception)); assertNull(exception.get()); - assertDoesNotThrow(() -> consumer.completeSilently(() -> { + assertDoesNotThrow(() -> consumer.completeQuietly(() -> { throw new KafkaException("Test exception"); }, "test", exception)); assertTrue(exception.get() instanceof KafkaException); @@ -530,13 +530,13 @@ public void testWaitOnCompletionDoesNotThrow() { @Test public void testEnsureAutocommitSent() { consumer.maybeAutoCommitSync(true, testBuilder.time.timer(100), null); - verify(consumer).completeSilently(any(), any(), any()); + verify(consumer).completeQuietly(any(), any(), any()); } @Test public void testEnsureautocommitNotSent() { consumer.maybeAutoCommitSync(false, testBuilder.time.timer(100), null); - verify(consumer, never()).completeSilently(any(), any(), any()); + verify(consumer, never()).completeQuietly(any(), any(), any()); } private void assertMockCommitCallbackInvoked(final Executable task, From 7df5695a4624a451cc2a41e371f7f5cba3bfd2c2 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Fri, 8 Dec 2023 09:13:18 -0800 Subject: [PATCH 04/14] pr comment --- .../consumer/internals/AsyncKafkaConsumer.java | 4 ++-- .../consumer/internals/ConsumerNetworkThread.java | 11 +++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 3bf2071bc6620..df4d2f50d026f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -378,7 +378,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { clientTelemetryReporter); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, - applicationEventQueue, + applicationEventQueue, requestManagersSupplier); this.applicationEventHandler = applicationEventHandlerFactory.build( logContext, @@ -570,7 +570,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, metadata, - applicationEventQueue, + applicationEventQueue, requestManagersSupplier ); this.applicationEventHandler = new ApplicationEventHandler(logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index a49343f4dbefd..ae83cecb8d57d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -251,13 +251,16 @@ private void closeInternal(final Duration timeout) { } } + /** + * Check the unsent queue one last time and poll until all requests are sent or the timer runs out. + */ private void sendUnsentRequests(final Timer timer) { - // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until - // all requests have received a response. - while (!networkClientDelegate.unsentRequests().isEmpty() && timer.notExpired()) { + if (networkClientDelegate.unsentRequests().isEmpty()) + return; + do { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); - } + } while (timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()); } void cleanup() { From da6715b443ec61f77e330c851cbc4f57a1a2ca32 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Mon, 11 Dec 2023 13:27:15 -0800 Subject: [PATCH 05/14] when consumer failed the rebalance callback, it doesn't need to send leave group ww --- .../consumer/internals/AsyncKafkaConsumer.java | 18 +++++++++++------- .../internals/AsyncKafkaConsumerTest.java | 17 ++++++++++++++++- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index df4d2f50d026f..6f2345794c816 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1246,10 +1246,10 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx return; maybeAutoCommitSync(autoCommitEnabled, timer, firstException); applicationEventHandler.add(new CommitOnCloseApplicationEvent()); - maybeRevokePartitions(firstException); timer.update(); completeQuietly( () -> { + maybeRevokePartitions(); applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer); timer.update(); }, @@ -1273,14 +1273,18 @@ void maybeAutoCommitSync(final boolean shouldAutoCommit, } // Visible for testing - void maybeRevokePartitions(final AtomicReference firstException) { + void maybeRevokePartitions() { if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) return; - CompletableFuture revocationFuture = invokePartitionRevocationListener(); - completeQuietly(revocationFuture::get, - "Failed revoking partitions of " + subscriptions.assignedPartitions(), - firstException); - subscriptions.assignFromSubscribed(Collections.emptySet()); + try { + invokePartitionRevocationListener().get(); + } catch (ExecutionException e) { + throw new KafkaException(e.getCause()); + } catch (InterruptedException e) { + throw new InterruptException(e); + } finally { + subscriptions.assignFromSubscribed(Collections.emptySet()); + } } // Visible for testing diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index aff7ac65044ae..ccf78556cacf2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent; +import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; @@ -509,7 +510,21 @@ public void testEnsureSubscribedPartitionsRevokedOnClosed() { subscriptions.assignFromSubscribed(singleton(tp)); consumer.close(Duration.ZERO); assertTrue(subscriptions.assignedPartitions().isEmpty()); - verify(consumer).maybeRevokePartitions(any()); + try { + verify(consumer).maybeRevokePartitions(); + } catch (Exception e) { + fail("Should not throw exception", e); + } + } + + @Test + public void testFailedPartitionRevocation() { + subscriptions.subscribe(singleton("topic"), Optional.empty()); + TopicPartition tp = new TopicPartition("topic", 0); + subscriptions.assignFromSubscribed(singleton(tp)); + doThrow(new KafkaException()).when(consumer).maybeRevokePartitions(); + assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO)); + verify(applicationEventHandler, never()).addAndGet(any(LeaveOnCloseApplicationEvent.class), any()); } @Test From 8ac91bc505ca88b23db0e114aafbfc7a8cd194a8 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Tue, 12 Dec 2023 12:03:02 -0800 Subject: [PATCH 06/14] changes based on discussion --- .../internals/AsyncKafkaConsumer.java | 22 ++++---------- .../events/ApplicationEventProcessor.java | 3 +- .../internals/AsyncKafkaConsumerTest.java | 30 +++++++++++++++---- .../internals/ConsumerNetworkThreadTest.java | 7 ----- .../internals/MembershipManagerImplTest.java | 18 ----------- 5 files changed, 32 insertions(+), 48 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 6f2345794c816..8960df48152c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1246,12 +1246,10 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx return; maybeAutoCommitSync(autoCommitEnabled, timer, firstException); applicationEventHandler.add(new CommitOnCloseApplicationEvent()); - timer.update(); completeQuietly( () -> { maybeRevokePartitions(); applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer); - timer.update(); }, "Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", this::maybeInvokeCommitCallbacks, @@ -1277,11 +1275,12 @@ void maybeRevokePartitions() { if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) return; try { - invokePartitionRevocationListener().get(); - } catch (ExecutionException e) { - throw new KafkaException(e.getCause()); - } catch (InterruptedException e) { - throw new InterruptException(e); + SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + if (subscriptions.rebalanceListener().isPresent()) + subscriptions.rebalanceListener().get().onPartitionsRevoked(droppedPartitions); + } catch (Exception e) { + throw new KafkaException(e); } finally { subscriptions.assignFromSubscribed(Collections.emptySet()); } @@ -1300,15 +1299,6 @@ void completeQuietly(final Utils.ThrowingRunnable function, } } - private CompletableFuture invokePartitionRevocationListener() { - SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); - droppedPartitions.addAll(subscriptions.assignedPartitions()); - if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) - return CompletableFuture.completedFuture(null); - // TODO: Invoke rebalanceListener via KAFKA-15276 - return CompletableFuture.completedFuture(null); - } - @Override public void wakeup() { wakeupTrigger.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 8ee9ee9420eb8..026332016a71c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -254,7 +254,8 @@ private void process(final CommitOnCloseApplicationEvent event) { if (!requestManagers.commitRequestManager.isPresent()) return; log.debug("Signal CommitRequestManager closing"); - requestManagers.commitRequestManager.get().signalClose(); } + requestManagers.commitRequestManager.get().signalClose(); + } private void process(final LeaveOnCloseApplicationEvent event) { if (!requestManagers.membershipManager.isPresent()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index ccf78556cacf2..74700445b6478 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -108,6 +108,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -518,17 +519,34 @@ public void testEnsureSubscribedPartitionsRevokedOnClosed() { } @Test - public void testFailedPartitionRevocation() { - subscriptions.subscribe(singleton("topic"), Optional.empty()); + public void testPartitionRevocationOnClose() { + MockRebalanceListener listener = new MockRebalanceListener(); + consumer.subscribe(singleton("topic"), listener); + TopicPartition tp = new TopicPartition("topic", 0); + subscriptions.assignFromSubscribed(singleton(tp)); + consumer.close(Duration.ZERO); + assertTrue(subscriptions.assignedPartitions().isEmpty()); + assertEquals(1, listener.revokedCount); + assertTrue(listener.revoked.contains(tp)); + } + + @Test + public void testFailedPartitionRevocationOnClose() { + // If rebalance listener failed to execute during close, we will skip sending leave group and proceed with + // closing the consumer. + ConsumerRebalanceListener listener = mock(ConsumerRebalanceListener.class); + subscriptions.subscribe(singleton("topic"), Optional.of(listener)); TopicPartition tp = new TopicPartition("topic", 0); subscriptions.assignFromSubscribed(singleton(tp)); - doThrow(new KafkaException()).when(consumer).maybeRevokePartitions(); + doThrow(new KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp))); assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO)); verify(applicationEventHandler, never()).addAndGet(any(LeaveOnCloseApplicationEvent.class), any()); + verify(listener).onPartitionsRevoked(eq(singleton(tp))); + verify(subscriptions).assignFromSubscribed(eq(Collections.emptySet())); } @Test - public void testWaitOnCompletionDoesNotThrow() { + public void testCompleteQuietly() { AtomicReference exception = new AtomicReference<>(); CompletableFuture future = CompletableFuture.completedFuture(null); assertDoesNotThrow(() -> consumer.completeQuietly(() -> { @@ -543,13 +561,13 @@ public void testWaitOnCompletionDoesNotThrow() { } @Test - public void testEnsureAutocommitSent() { + public void testAutoCommitSyncEnabled() { consumer.maybeAutoCommitSync(true, testBuilder.time.timer(100), null); verify(consumer).completeQuietly(any(), any(), any()); } @Test - public void testEnsureautocommitNotSent() { + public void testAutoCommitSyncDisabled() { consumer.maybeAutoCommitSync(false, testBuilder.time.timer(100), null); verify(consumer, never()).completeQuietly(any(), any(), any()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index d7db239f3d0e2..e6bca05d8a151 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -127,7 +127,6 @@ public void testStartupAndTearDown() throws InterruptedException { TestUtils.waitForCondition(isStarted, "The consumer network thread did not start within " + DEFAULT_MAX_WAIT_MS + " ms"); - prepareTearDown(); consumerNetworkThread.close(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); TestUtils.waitForCondition(isClosed, @@ -287,12 +286,6 @@ void testEnsureEventsAreCompleted() { assertTrue(applicationEventsQueue.isEmpty()); } - private void prepareTearDown() { - Node node = metadata.fetch().nodes().get(0); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); - } - private void prepareOffsetCommitRequest(final Map expectedOffsets, final Errors error, final boolean disconnected) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index a098f6be36336..02ac52b21d526 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -17,12 +17,10 @@ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -67,7 +65,6 @@ import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -1404,21 +1401,6 @@ public void testHeartbeatSentOnStaledMember() { membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.JOINING, membershipManager.state()); } - public void testLeaveGroupOnShutdown() { - ConsumerRebalanceListener cb = mock(ConsumerRebalanceListener.class); - MembershipManagerImpl membershipManager = createMemberInStableState(); - subscriptionState.subscribe(Collections.singleton("topic1"), Optional.of(cb)); - subscriptionState.assignFromSubscribed(Collections.singleton(new TopicPartition("topic1", 0))); - dropAssignedPartitions(); - assertTrue(subscriptionState.assignedPartitions().isEmpty()); - CompletableFuture leaveGroupFuture = membershipManager.leaveGroup(); - membershipManager.onHeartbeatRequestSent(); - assertTrue(leaveGroupFuture.isDone()); - verify(membershipManager, never()).revokePartitions(anySet()); - verify(membershipManager, never()).invokeOnPartitionsLostCallback(anySet()); - verify(membershipManager).transitionToSendingLeaveGroup(); - - } private void dropAssignedPartitions() { SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); From 4424fbdad569f20cec831688a8f92ef6107da8ad Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Tue, 12 Dec 2023 13:25:55 -0800 Subject: [PATCH 07/14] clean u p --- .../internals/AsyncKafkaConsumer.java | 3 ++- .../internals/AsyncKafkaConsumerTest.java | 19 +------------------ 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 8960df48152c0..595f6db5d6240 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1218,7 +1218,6 @@ private void close(Duration timeout, boolean swallowException) { if (applicationEventHandler != null) closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); closeTimer.update(); - // Ensure all async commit callbacks are invoked closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); @@ -1240,6 +1239,8 @@ private void close(Duration timeout, boolean swallowException) { * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: * 1. autocommit offsets * 2. revoke all partitions + * 3. if partition revocation completes successfully, send leave group + * 4. invoke all async commit callbacks if there is any */ void prepareShutdown(final Timer timer, final AtomicReference firstException) { if (!groupMetadata.isPresent()) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 74700445b6478..fed97610a3476 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -504,20 +504,6 @@ public void testVerifyApplicationEventOnShutdown() { verify(applicationEventHandler).add(any(CommitApplicationEvent.class)); } - @Test - public void testEnsureSubscribedPartitionsRevokedOnClosed() { - subscriptions.subscribe(singleton("topic"), Optional.empty()); - TopicPartition tp = new TopicPartition("topic", 0); - subscriptions.assignFromSubscribed(singleton(tp)); - consumer.close(Duration.ZERO); - assertTrue(subscriptions.assignedPartitions().isEmpty()); - try { - verify(consumer).maybeRevokePartitions(); - } catch (Exception e) { - fail("Should not throw exception", e); - } - } - @Test public void testPartitionRevocationOnClose() { MockRebalanceListener listener = new MockRebalanceListener(); @@ -1013,7 +999,6 @@ public void testBackgroundError() { final ErrorBackgroundEvent errorBackgroundEvent = new ErrorBackgroundEvent(expectedException); backgroundEventQueue.add(errorBackgroundEvent); consumer.assign(singletonList(new TopicPartition("topic", 0))); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); assertEquals(expectedException.getMessage(), exception.getMessage()); @@ -1032,7 +1017,6 @@ public void testMultipleBackgroundErrors() { final ErrorBackgroundEvent errorBackgroundEvent2 = new ErrorBackgroundEvent(expectedException2); backgroundEventQueue.add(errorBackgroundEvent2); consumer.assign(singletonList(new TopicPartition("topic", 0))); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); assertEquals(expectedException1.getMessage(), exception.getMessage()); @@ -1087,8 +1071,7 @@ public void testGroupIdNull() { @Test public void testGroupIdNotNullAndValid() { - final Properties props = requiredConsumerProperties(); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); + final Properties props = requiredConsumerPropertiesAndGroupId("consumerGroupA"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); final ConsumerConfig config = new ConsumerConfig(props); From 8425110da2a25459b3b1f85a72adee9f68146274 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Tue, 12 Dec 2023 13:29:32 -0800 Subject: [PATCH 08/14] revert unneeded changes --- .../internals/ConsumerNetworkThread.java | 1 + .../internals/NetworkClientDelegate.java | 18 +++++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index ae83cecb8d57d..aa352cd68a22e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -138,6 +138,7 @@ void runOnce() { .map(networkClientDelegate::addAll) .reduce(MAX_POLL_TIMEOUT_MS, Math::min); networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs); + cachedMaximumTimeToWait = requestManagers.entries().stream() .filter(Optional::isPresent) .map(Optional::get) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 4efbac8ccd856..141f5f955c8b5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -64,10 +64,10 @@ public class NetworkClientDelegate implements AutoCloseable { private final long retryBackoffMs; public NetworkClientDelegate( - final Time time, - final ConsumerConfig config, - final LogContext logContext, - final KafkaClient client) { + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { this.time = time; this.client = client; this.log = logContext.logger(getClass()); @@ -310,11 +310,11 @@ Optional node() { @Override public String toString() { return "UnsentRequest{" + - "requestBuilder=" + requestBuilder + - ", handler=" + handler + - ", node=" + node + - ", timer=" + timer + - '}'; + "requestBuilder=" + requestBuilder + + ", handler=" + handler + + ", node=" + node + + ", timer=" + timer + + '}'; } } From 74bdd6d7aaa81b30a2d40dd93caa97af39e81913 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Fri, 15 Dec 2023 10:24:43 -0800 Subject: [PATCH 09/14] conflict resolution --- .../clients/consumer/internals/AsyncKafkaConsumer.java | 2 ++ .../internals/events/ApplicationEventProcessor.java | 8 ++++++-- .../consumer/internals/ApplicationEventProcessorTest.java | 3 +-- .../consumer/internals/MembershipManagerImplTest.java | 1 + 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 595f6db5d6240..bb7f0b235fcaa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -44,6 +44,7 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; @@ -51,6 +52,7 @@ import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent; +import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 026332016a71c..8359cb8320c95 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -258,12 +259,15 @@ private void process(final CommitOnCloseApplicationEvent event) { } private void process(final LeaveOnCloseApplicationEvent event) { - if (!requestManagers.membershipManager.isPresent()) { + if (!requestManagers.heartbeatRequestManager.isPresent()) { event.future().complete(null); return; } + MembershipManager membershipManager = + Objects.requireNonNull(requestManagers.heartbeatRequestManager.get().membershipManager(), "Expecting " + + "membership manager to be non-null"); log.debug("Leaving group before closing"); - CompletableFuture future = requestManagers.membershipManager.get().leaveGroup(); + CompletableFuture future = membershipManager.leaveGroup(); // The future will be completed on heartbeat sent event.chain(future); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java index 45641bbea64d8..da10dfc15c5e7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java @@ -69,8 +69,7 @@ public void setup() { fetchRequestManager, Optional.of(coordinatorRequestManager), Optional.of(commitRequestManager), - Optional.of(heartbeatRequestManager), - Optional.of(membershipManager)); + Optional.of(heartbeatRequestManager)); processor = new ApplicationEventProcessor( new LogContext(), applicationEventQueue, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index 02ac52b21d526..a75b613a17ae8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; From 79029a2711564dec2fdedb2951ecff61b985aa9b Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Fri, 15 Dec 2023 10:27:21 -0800 Subject: [PATCH 10/14] disable flaky tests Update ApplicationEventProcessorTest.java --- .../consumer/internals/ApplicationEventProcessorTest.java | 1 + .../kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java | 1 + 2 files changed, 2 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java index da10dfc15c5e7..7074542c3b030 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java @@ -89,6 +89,7 @@ public void testPrepClosingCommitEvents() { @Test public void testPrepClosingLeaveGroupEvent() { LeaveOnCloseApplicationEvent event = new LeaveOnCloseApplicationEvent(); + when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null)); processor.process(event); verify(membershipManager).leaveGroup(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index fed97610a3476..3575444380adb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -1034,6 +1034,7 @@ public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() { } @Test + @Disabled("The test is flaky from time to time") public void testGroupRemoteAssignorUnusedInGenericProtocol() { final Properties props = requiredConsumerProperties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); From 00dc40367e992c88034ba88cd44758c461dc26cc Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Sat, 16 Dec 2023 17:52:18 -0800 Subject: [PATCH 11/14] Update MetadataTest.java Update MetadataTest.java From 4f271138815e332d0ae27e8f04b05bd06f67fe6f Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Sun, 17 Dec 2023 16:55:27 -0800 Subject: [PATCH 12/14] Update ConsumerTestBuilder.java --- .../kafka/clients/consumer/internals/ConsumerTestBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 2319f75a57eee..ae7214c3f6aef 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -303,7 +303,7 @@ public ConsumerNetworkThreadTestBuilder(Optional groupInfo) { @Override public void close() { - consumerNetworkThread.close(); + consumerNetworkThread.close(Duration.ZERO); } } From 17862086b63864a8ad685f822e565da8626e8ea3 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Mon, 18 Dec 2023 16:35:21 -0800 Subject: [PATCH 13/14] rebase --- .../internals/AsyncKafkaConsumer.java | 13 +- .../internals/AsyncKafkaConsumerTest.java | 141 +++++++++++++++--- .../internals/ConsumerTestBuilder.java | 1 + 3 files changed, 128 insertions(+), 27 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index bb7f0b235fcaa..6f0eaef4eb5ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1265,12 +1265,15 @@ void maybeAutoCommitSync(final boolean shouldAutoCommit, final AtomicReference firstException) { if (!shouldAutoCommit) return; - completeQuietly(() -> { - Map allConsumed = subscriptions.allConsumed(); - log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + Map allConsumed = subscriptions.allConsumed(); + log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + try { commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); - timer.update(); - }, "Failed autoCommitSync with a timeout(ms)=" + timer.timeoutMs(), firstException); + } catch (Exception e) { + // consistent with async auto-commit failures, we do not propagate the exception + log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); + } + timer.update(); } // Visible for testing diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 3575444380adb..ac052d72f6ffb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.Metadata.LeaderAndEpoch; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -26,11 +27,14 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; @@ -46,6 +50,7 @@ import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidGroupIdException; @@ -53,10 +58,12 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -91,6 +98,7 @@ import java.util.stream.Stream; import static java.util.Arrays.asList; +import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED; @@ -115,13 +123,16 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @SuppressWarnings("unchecked") public class AsyncKafkaConsumerTest { + private long retryBackoffMs = 100L; + private int defaultApiTimeoutMs = 1000; + private boolean autoCommitEnabled = true; + private AsyncKafkaConsumer consumer = null; private final Time time = new MockTime(1); @@ -134,7 +145,7 @@ public class AsyncKafkaConsumerTest { public void resetAll() { backgroundEventQueue.clear(); if (consumer != null) { - consumer.close(); + consumer.close(Duration.ZERO); } consumer = null; Mockito.framework().clearInlineMocks(); @@ -173,6 +184,35 @@ private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { ); } + private AsyncKafkaConsumer newConsumer( + FetchBuffer fetchBuffer, + ConsumerInterceptors interceptors, + ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, + SubscriptionState subscriptions, + List assignors, + String groupId, + String clientId) { + return new AsyncKafkaConsumer<>( + new LogContext(), + clientId, + new Deserializers<>(new StringDeserializer(), new StringDeserializer()), + fetchBuffer, + fetchCollector, + interceptors, + time, + applicationEventHandler, + backgroundEventQueue, + rebalanceListenerInvoker, + new Metrics(), + subscriptions, + metadata, + retryBackoffMs, + defaultApiTimeoutMs, + assignors, + groupId, + autoCommitEnabled); + } + @Test public void testSuccessfulStartupShutdown() { consumer = newConsumer(); @@ -298,11 +338,12 @@ public void testCommittedLeaderEpochUpdate() { public void testCommittedExceptionThrown() { consumer = newConsumer(); Map offsets = mockTopicPartitionOffset(); - when(applicationEventHandler.addAndGet(any(), any())).thenAnswer(invocation -> { - CompletableApplicationEvent event = invocation.getArgument(0); - assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, event); - throw new KafkaException("Test exception"); - }); + when(applicationEventHandler.addAndGet( + any(FetchCommittedOffsetsApplicationEvent.class), any())).thenAnswer(invocation -> { + CompletableApplicationEvent event = invocation.getArgument(0); + assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, event); + throw new KafkaException("Test exception"); + }); assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); } @@ -450,18 +491,31 @@ public void testCommitSyncLeaderEpochUpdate() { @Test public void testCommitAsyncLeaderEpochUpdate() { - consumer = newConsumer(); - MockCommitCallback callback = new MockCommitCallback(); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + new ConsumerInterceptors<>(Collections.emptyList()), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); HashMap topicPartitionOffsets = new HashMap<>(); topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), "")); topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), "")); - + when(metadata.currentLeader(t0)).thenReturn( + new LeaderAndEpoch(Optional.of( + new Node(1, "host", 9000)), Optional.of(1))); + when(metadata.currentLeader(t1)).thenReturn( + new LeaderAndEpoch(Optional.of( + new Node(1, "host", 9000)), Optional.of(1))); consumer.assign(Arrays.asList(t0, t1)); consumer.seek(t0, 10); consumer.seek(t1, 20); + MockCommitCallback callback = new MockCommitCallback(); assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, callback)); verify(metadata).updateLastSeenEpochIfNewer(t0, 2); @@ -500,20 +554,28 @@ public void testVerifyApplicationEventOnShutdown() { consumer = newConsumer(); doReturn(null).when(applicationEventHandler).addAndGet(any(), any()); consumer.close(); - verify(applicationEventHandler, times(2)).addAndGet(any(ConsumerCloseApplicationEvent.class), any()); - verify(applicationEventHandler).add(any(CommitApplicationEvent.class)); + verify(applicationEventHandler).addAndGet(any(LeaveOnCloseApplicationEvent.class), any()); + verify(applicationEventHandler).add(any(CommitOnCloseApplicationEvent.class)); } @Test public void testPartitionRevocationOnClose() { MockRebalanceListener listener = new MockRebalanceListener(); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); + consumer.subscribe(singleton("topic"), listener); - TopicPartition tp = new TopicPartition("topic", 0); - subscriptions.assignFromSubscribed(singleton(tp)); + subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); consumer.close(Duration.ZERO); assertTrue(subscriptions.assignedPartitions().isEmpty()); assertEquals(1, listener.revokedCount); - assertTrue(listener.revoked.contains(tp)); } @Test @@ -521,6 +583,15 @@ public void testFailedPartitionRevocationOnClose() { // If rebalance listener failed to execute during close, we will skip sending leave group and proceed with // closing the consumer. ConsumerRebalanceListener listener = mock(ConsumerRebalanceListener.class); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + new ConsumerInterceptors<>(Collections.emptyList()), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); subscriptions.subscribe(singleton("topic"), Optional.of(listener)); TopicPartition tp = new TopicPartition("topic", 0); subscriptions.assignFromSubscribed(singleton(tp)); @@ -528,13 +599,14 @@ public void testFailedPartitionRevocationOnClose() { assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO)); verify(applicationEventHandler, never()).addAndGet(any(LeaveOnCloseApplicationEvent.class), any()); verify(listener).onPartitionsRevoked(eq(singleton(tp))); - verify(subscriptions).assignFromSubscribed(eq(Collections.emptySet())); + assertEquals(emptySet(), subscriptions.assignedPartitions()); } @Test public void testCompleteQuietly() { AtomicReference exception = new AtomicReference<>(); CompletableFuture future = CompletableFuture.completedFuture(null); + consumer = newConsumer(); assertDoesNotThrow(() -> consumer.completeQuietly(() -> { future.get(0, TimeUnit.MILLISECONDS); }, "test", exception)); @@ -548,14 +620,38 @@ public void testCompleteQuietly() { @Test public void testAutoCommitSyncEnabled() { - consumer.maybeAutoCommitSync(true, testBuilder.time.timer(100), null); - verify(consumer).completeQuietly(any(), any(), any()); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); + consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); + subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + subscriptions.seek(new TopicPartition("topic", 0), 100); + consumer.maybeAutoCommitSync(true, time.timer(100), null); + verify(applicationEventHandler).add(any(CommitApplicationEvent.class)); } @Test public void testAutoCommitSyncDisabled() { - consumer.maybeAutoCommitSync(false, testBuilder.time.timer(100), null); - verify(consumer, never()).completeQuietly(any(), any(), any()); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); + consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); + subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + subscriptions.seek(new TopicPartition("topic", 0), 100); + consumer.maybeAutoCommitSync(false, time.timer(100), null); + verify(applicationEventHandler, never()).add(any(CommitApplicationEvent.class)); } private void assertMockCommitCallbackInvoked(final Executable task, @@ -651,7 +747,9 @@ public void testBeginningOffsetsThrowsKafkaExceptionForUnderlyingExecutionFailur Set partitions = mockTopicPartitionOffset().keySet(); Throwable eventProcessingFailure = new KafkaException("Unexpected failure " + "processing List Offsets event"); - doThrow(eventProcessingFailure).when(applicationEventHandler).addAndGet(any(), any()); + doThrow(eventProcessingFailure).when(applicationEventHandler).addAndGet( + any(ListOffsetsApplicationEvent.class), + any()); Throwable consumerError = assertThrows(KafkaException.class, () -> consumer.beginningOffsets(partitions, Duration.ofMillis(1))); @@ -1034,7 +1132,6 @@ public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() { } @Test - @Disabled("The test is flaky from time to time") public void testGroupRemoteAssignorUnusedInGenericProtocol() { final Properties props = requiredConsumerProperties(); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index ae7214c3f6aef..eb3cdd0926e7f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Timer; import java.io.Closeable; +import java.time.Duration; import java.util.HashMap; import java.util.Optional; import java.util.Properties; From 11a3ae633ac551efcf342aa9cb32bd57a1b44314 Mon Sep 17 00:00:00 2001 From: Philip Nee Date: Mon, 18 Dec 2023 20:46:29 -0800 Subject: [PATCH 14/14] disable flaky tests --- .../scala/integration/kafka/api/PlaintextConsumerTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index b3f1f0c458f7d..01e9aec257878 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -533,7 +533,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly")) def testExpandingTopicSubscriptions(quorum: String, groupProtocol: String): Unit = { val otherTopic = "other" val initialAssignment = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) @@ -548,7 +548,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly")) def testShrinkingTopicSubscriptions(quorum: String, groupProtocol: String): Unit = { val otherTopic = "other" createTopic(otherTopic, 2, brokerCount)