diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 2d5906320b6e3..6f0eaef4eb5ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -44,6 +44,7 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; @@ -51,6 +52,7 @@ import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent; +import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; @@ -83,7 +85,9 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; +import org.slf4j.event.Level; import java.net.InetSocketAddress; import java.time.Duration; @@ -94,10 +98,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -124,6 +130,7 @@ import static org.apache.kafka.common.utils.Utils.closeQuietly; import static org.apache.kafka.common.utils.Utils.isBlank; import static org.apache.kafka.common.utils.Utils.join; +import static org.apache.kafka.common.utils.Utils.swallow; /** * This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process @@ -245,7 +252,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final ApplicationEventHandler applicationEventHandler; private final Time time; - private Optional groupMetadata; + private Optional groupMetadata = Optional.empty(); private final KafkaConsumerMetrics kafkaConsumerMetrics; private Logger log; private final String clientId; @@ -268,6 +275,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final Metrics metrics; private final long retryBackoffMs; private final int defaultApiTimeoutMs; + private final boolean autoCommitEnabled; private volatile boolean closed = false; private final List assignors; private final Optional clientTelemetryReporter; @@ -313,6 +321,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { GroupRebalanceConfig.ProtocolType.CONSUMER ); this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); + this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); LogContext logContext = createLogContext(config, groupRebalanceConfig); this.log = logContext.logger(getClass()); @@ -434,6 +443,51 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { } // Visible for testing + AsyncKafkaConsumer(LogContext logContext, + String clientId, + Deserializers deserializers, + FetchBuffer fetchBuffer, + FetchCollector fetchCollector, + ConsumerInterceptors interceptors, + Time time, + ApplicationEventHandler applicationEventHandler, + BlockingQueue backgroundEventQueue, + ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, + Metrics metrics, + SubscriptionState subscriptions, + ConsumerMetadata metadata, + long retryBackoffMs, + int defaultApiTimeoutMs, + List assignors, + String groupId, + boolean autoCommitEnabled) { + this.log = logContext.logger(getClass()); + this.subscriptions = subscriptions; + this.clientId = clientId; + this.fetchBuffer = fetchBuffer; + this.fetchCollector = fetchCollector; + this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; + this.interceptors = Objects.requireNonNull(interceptors); + this.time = time; + this.backgroundEventProcessor = new BackgroundEventProcessor( + logContext, + backgroundEventQueue, + applicationEventHandler, + rebalanceListenerInvoker + ); + this.metrics = metrics; + this.groupMetadata = initializeGroupMetadata(groupId, Optional.empty()); + this.metadata = metadata; + this.retryBackoffMs = retryBackoffMs; + this.defaultApiTimeoutMs = defaultApiTimeoutMs; + this.deserializers = deserializers; + this.applicationEventHandler = applicationEventHandler; + this.assignors = assignors; + this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); + this.clientTelemetryReporter = Optional.empty(); + this.autoCommitEnabled = autoCommitEnabled; + } + AsyncKafkaConsumer(LogContext logContext, Time time, ConsumerConfig config, @@ -446,6 +500,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); + this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); this.fetchBuffer = new FetchBuffer(logContext); this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; this.interceptors = new ConsumerInterceptors<>(Collections.emptyList()); @@ -1159,15 +1214,12 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - + // Prepare shutting down the network thread + prepareShutdown(closeTimer, firstException); + closeTimer.update(); if (applicationEventHandler != null) - closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); - - // Invoke all callbacks after the background thread exists in case if there are unsent async - // commits - maybeInvokeCommitCallbacks(); - - closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); + closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); + closeTimer.update(); closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); @@ -1185,6 +1237,74 @@ private void close(Duration timeout, boolean swallowException) { } } + /** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + * 3. if partition revocation completes successfully, send leave group + * 4. invoke all async commit callbacks if there is any + */ + void prepareShutdown(final Timer timer, final AtomicReference firstException) { + if (!groupMetadata.isPresent()) + return; + maybeAutoCommitSync(autoCommitEnabled, timer, firstException); + applicationEventHandler.add(new CommitOnCloseApplicationEvent()); + completeQuietly( + () -> { + maybeRevokePartitions(); + applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer); + }, + "Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); + swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", this::maybeInvokeCommitCallbacks, + firstException); + } + + // Visible for testing + void maybeAutoCommitSync(final boolean shouldAutoCommit, + final Timer timer, + final AtomicReference firstException) { + if (!shouldAutoCommit) + return; + Map allConsumed = subscriptions.allConsumed(); + log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + try { + commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); + } catch (Exception e) { + // consistent with async auto-commit failures, we do not propagate the exception + log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); + } + timer.update(); + } + + // Visible for testing + void maybeRevokePartitions() { + if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) + return; + try { + SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + if (subscriptions.rebalanceListener().isPresent()) + subscriptions.rebalanceListener().get().onPartitionsRevoked(droppedPartitions); + } catch (Exception e) { + throw new KafkaException(e); + } finally { + subscriptions.assignFromSubscribed(Collections.emptySet()); + } + } + + // Visible for testing + void completeQuietly(final Utils.ThrowingRunnable function, + final String msg, + final AtomicReference firstException) { + try { + function.run(); + } catch (TimeoutException e) { + log.debug("Timeout expired before the {} operation could complete.", msg); + } catch (Exception e) { + firstException.compareAndSet(null, e); + } + } + @Override public void wakeup() { wakeupTrigger.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index fea3b21d26552..6316ead793400 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -80,6 +80,7 @@ public class CommitRequestManager implements RequestManager { private final OptionalDouble jitter; private final boolean throwOnFetchStableOffsetUnsupported; final PendingRequests pendingRequests; + private boolean closing = false; public CommitRequestManager( final Time time, @@ -153,6 +154,10 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent()) return EMPTY; + if (closing) { + return drainPendingOffsetCommitRequests(); + } + maybeAutoCommitAllConsumed(); if (!pendingRequests.hasUnsentRequests()) return EMPTY; @@ -165,6 +170,11 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return new NetworkClientDelegate.PollResult(timeUntilNextPoll, requests); } + @Override + public void signalClose() { + closing = true; + } + /** * Returns the delay for which the application thread can safely wait before it should be responsive * to results from the request managers. For example, the subscription state can change when heartbeats @@ -315,12 +325,10 @@ public void resetAutoCommitTimer() { * Drains the inflight offsetCommits during shutdown because we want to make sure all pending commits are sent * before closing. */ - @Override - public NetworkClientDelegate.PollResult pollOnClose() { - if (!pendingRequests.hasUnsentRequests() || !coordinatorRequestManager.coordinator().isPresent()) + public NetworkClientDelegate.PollResult drainPendingOffsetCommitRequests() { + if (pendingRequests.unsentOffsetCommits.isEmpty()) return EMPTY; - - List requests = pendingRequests.drainOnClose(); + List requests = pendingRequests.drainPendingCommits(); return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests); } @@ -785,7 +793,7 @@ private void clearAll() { unsentOffsetFetches.clear(); } - private List drainOnClose() { + private List drainPendingCommits() { ArrayList res = new ArrayList<>(); res.addAll(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList())); clearAll(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index cacc3e398aa9b..aa352cd68a22e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -16,12 +16,10 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; -import org.apache.kafka.common.Node; import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.utils.KafkaThread; @@ -33,14 +31,10 @@ import java.io.Closeable; import java.time.Duration; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; import java.util.function.Supplier; -import java.util.stream.Collectors; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -179,27 +173,11 @@ static void runAtClose(final Collection> requ final NetworkClientDelegate networkClientDelegate, final Timer timer) { // These are the optional outgoing requests at the - List pollResults = requestManagers.stream() + requestManagers.stream() .filter(Optional::isPresent) .map(Optional::get) .map(RequestManager::pollOnClose) - .collect(Collectors.toList()); - long pollWaitTimeMs = pollResults.stream() - .map(networkClientDelegate::addAll) - .reduce(MAX_POLL_TIMEOUT_MS, Math::min); - - List> requestFutures = pollResults.stream() - .flatMap(fads -> fads.unsentRequests.stream()) - .map(NetworkClientDelegate.UnsentRequest::future) - .collect(Collectors.toList()); - - // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until - // all requests have received a response. - do { - pollWaitTimeMs = Math.min(pollWaitTimeMs, timer.remainingMs()); - networkClientDelegate.poll(pollWaitTimeMs, timer.currentTimeMs()); - timer.update(); - } while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)); + .forEach(networkClientDelegate::addAll); } public boolean isRunning() { @@ -274,80 +252,31 @@ private void closeInternal(final Duration timeout) { } } + /** + * Check the unsent queue one last time and poll until all requests are sent or the timer runs out. + */ + private void sendUnsentRequests(final Timer timer) { + if (networkClientDelegate.unsentRequests().isEmpty()) + return; + do { + networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); + timer.update(); + } while (timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()); + } + void cleanup() { + log.trace("Closing the consumer network thread"); + Timer timer = time.timer(closeTimeout); try { - log.trace("Closing the consumer network thread"); - Timer timer = time.timer(closeTimeout); - maybeAutocommitOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); - maybeLeaveGroup(timer); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { + sendUnsentRequests(timer); closeQuietly(requestManagers, "request managers"); closeQuietly(networkClientDelegate, "network client delegate"); closeQuietly(applicationEventProcessor, "application event processor"); log.debug("Closed the consumer network thread"); } } - - /** - * We need to autocommit before shutting down the consumer. The method needs to first connect to the coordinator - * node to construct the closing requests. Then wait for all closing requests to finish before returning. The - * method is bounded by a closing timer. We will continue closing down the consumer if the requests cannot be - * completed in time. - */ - // Visible for testing - void maybeAutocommitOnClose(final Timer timer) { - if (!requestManagers.coordinatorRequestManager.isPresent()) - return; - - if (!requestManagers.commitRequestManager.isPresent()) { - log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); - return; - } - - if (!requestManagers.commitRequestManager.get().canAutoCommit()) { - return; - } - - ensureCoordinatorReady(timer); - NetworkClientDelegate.UnsentRequest autocommitRequest = - requestManagers.commitRequestManager.get().createCommitAllConsumedRequest(); - networkClientDelegate.add(autocommitRequest); - do { - long currentTimeMs = timer.currentTimeMs(); - ensureCoordinatorReady(timer); - networkClientDelegate.poll(timer.remainingMs(), currentTimeMs); - } while (timer.notExpired() && !autocommitRequest.future().isDone()); - } - - void maybeLeaveGroup(final Timer timer) { - // TODO: Leave group upon closing the consumer - } - - private void ensureCoordinatorReady(final Timer timer) { - while (!coordinatorReady() && timer.notExpired()) { - findCoordinatorSync(timer); - } - } - - private boolean coordinatorReady() { - CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.orElseThrow( - () -> new IllegalStateException("CoordinatorRequestManager uninitialized.")); - Optional coordinator = coordinatorRequestManager.coordinator(); - return coordinator.isPresent() && !networkClientDelegate.isUnavailable(coordinator.get()); - } - - private void findCoordinatorSync(final Timer timer) { - CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.orElseThrow( - () -> new IllegalStateException("CoordinatorRequestManager uninitialized.")); - NetworkClientDelegate.UnsentRequest request = coordinatorRequestManager.makeFindCoordinatorRequest(timer.currentTimeMs()); - networkClientDelegate.addAll(Collections.singletonList(request)); - CompletableFuture findCoordinatorRequest = request.future(); - while (timer.notExpired() && !findCoordinatorRequest.isDone()) { - networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); - timer.update(); - } - } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java index d99fa77f0f9e7..bef9f2c9cad5d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java @@ -16,13 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import java.util.Optional; - import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; @@ -33,6 +26,13 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + /** * {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition @@ -82,6 +82,7 @@ public PollResult poll(long currentTimeMs) { */ @Override public PollResult pollOnClose() { + // TODO: move the logic to poll to handle signal close return pollInternal( prepareCloseFetchSessionRequests(), this::handleCloseFetchSessionSuccess, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index 1cdd26977d00a..62fe07fd32cfb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -114,14 +114,14 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource /** * TopicPartition comparator based on topic name and partition id. */ - private final static TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = + final static TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new TopicPartitionComparator(); /** * TopicIdPartition comparator based on topic name and partition id (ignoring ID while sorting, * as this is sorted mainly for logging purposes). */ - private final static TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = + final static TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = new TopicIdPartitionComparator(); /** @@ -635,8 +635,9 @@ private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignme * Reset member epoch to the value required for the leave the group heartbeat request, and * transition to the {@link MemberState#LEAVING} state so that a heartbeat * request is sent out with it. + * Visible for testing. */ - private void transitionToSendingLeaveGroup() { + void transitionToSendingLeaveGroup() { if (state == MemberState.FATAL) { log.warn("Member {} with epoch {} won't send leave group request because it is in " + "FATAL state", memberId, memberEpoch); @@ -993,8 +994,9 @@ private void addToAssignmentReadyToReconcile(Uuid topicId, String topicName, Sor * * @param revokedPartitions Partitions to revoke. * @return Future that will complete when the commit request and user callback completes. + * Visible for testing */ - private CompletableFuture revokePartitions(Set revokedPartitions) { + CompletableFuture revokePartitions(Set revokedPartitions) { log.info("Revoking previously assigned partitions {}", Utils.join(revokedPartitions, ", ")); logPausedPartitionsBeingRevoked(revokedPartitions); @@ -1021,7 +1023,7 @@ private CompletableFuture revokePartitions(Set revokedPart // retriable errors, so at this point we assume this is non-retriable, but // proceed with the revocation anyway). log.error("Commit request before revocation failed with non-retriable error. Will" + - " proceed with the revocation anyway.", error); + " proceed with the revocation anyway.", error); } // At this point we expect to be in a middle of a revocation triggered from RECONCILING @@ -1041,7 +1043,7 @@ private CompletableFuture revokePartitions(Set revokedPart userCallbackResult.whenComplete((callbackResult, callbackError) -> { if (callbackError != null) { log.error("onPartitionsRevoked callback invocation failed for partitions {}", - revokedPartitions, callbackError); + revokedPartitions, callbackError); revocationResult.completeExceptionally(callbackError); } else { revocationResult.complete(null); @@ -1124,7 +1126,8 @@ private CompletableFuture invokeOnPartitionsAssignedCallback(Set invokeOnPartitionsLostCallback(Set partitionsLost) { + // Visible for testing + CompletableFuture invokeOnPartitionsLostCallback(Set partitionsLost) { // This should not trigger the callback if partitionsLost is empty, to keep the current // behaviour. Optional listener = subscriptions.rebalanceListener(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java index 0cda89aedfb76..e37032835d69b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java @@ -77,4 +77,11 @@ default PollResult pollOnClose() { default long maximumTimeToWait(long currentTimeMs) { return Long.MAX_VALUE; } + + /** + * Signals the request manager that the consumer is closing to prepare for the proper actions to be taken. + */ + default void signalClose() { + return; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index bc3c4f8d4a4a4..4396df2785368 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -26,7 +26,8 @@ public abstract class ApplicationEvent { public enum Type { COMMIT, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, - UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED + UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, + COMMIT_ON_CLOSE, LEAVE_ON_CLOSE } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index d7aade6e31a10..8359cb8320c95 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -116,6 +117,14 @@ public void process(ApplicationEvent event) { process((ConsumerRebalanceListenerCallbackCompletedEvent) event); return; + case COMMIT_ON_CLOSE: + process((CommitOnCloseApplicationEvent) event); + return; + + case LEAVE_ON_CLOSE: + process((LeaveOnCloseApplicationEvent) event); + return; + default: log.warn("Application event type " + event.type() + " was not expected"); } @@ -242,6 +251,27 @@ private void process(final ConsumerRebalanceListenerCallbackCompletedEvent event manager.consumerRebalanceListenerCallbackCompleted(event); } + private void process(final CommitOnCloseApplicationEvent event) { + if (!requestManagers.commitRequestManager.isPresent()) + return; + log.debug("Signal CommitRequestManager closing"); + requestManagers.commitRequestManager.get().signalClose(); + } + + private void process(final LeaveOnCloseApplicationEvent event) { + if (!requestManagers.heartbeatRequestManager.isPresent()) { + event.future().complete(null); + return; + } + MembershipManager membershipManager = + Objects.requireNonNull(requestManagers.heartbeatRequestManager.get().membershipManager(), "Expecting " + + "membership manager to be non-null"); + log.debug("Leaving group before closing"); + CompletableFuture future = membershipManager.leaveGroup(); + // The future will be completed on heartbeat sent + event.chain(future); + } + /** * Creates a {@link Supplier} for deferred creation during invocation by * {@link ConsumerNetworkThread}. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java new file mode 100644 index 0000000000000..4cc07e945f9d2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class CommitOnCloseApplicationEvent extends ApplicationEvent { + + public CommitOnCloseApplicationEvent() { + super(Type.COMMIT_ON_CLOSE); + } + + @Override + public String toString() { + return "CommitOnCloseApplicationEvent{" + + toStringBase() + + '}'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java new file mode 100644 index 0000000000000..ee0b6ffa61c7d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +public class LeaveOnCloseApplicationEvent extends CompletableApplicationEvent { + public LeaveOnCloseApplicationEvent() { + super(Type.LEAVE_ON_CLOSE); + } + + @Override + public String toString() { + return "LeaveOnCloseApplicationEvent{" + + toStringBase() + + '}'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 924b9792fa826..c316b7a181662 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -16,15 +16,6 @@ */ package org.apache.kafka.common.utils; -import java.lang.reflect.Modifier; -import java.nio.BufferUnderflowException; -import java.nio.ByteOrder; -import java.nio.file.StandardOpenOption; -import java.util.AbstractMap; -import java.util.EnumSet; -import java.util.Map.Entry; -import java.util.SortedSet; -import java.util.TreeSet; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.network.TransferableChannel; @@ -42,7 +33,10 @@ import java.io.StringWriter; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Modifier; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; @@ -52,16 +46,22 @@ import java.nio.file.Paths; import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.text.DecimalFormat; import java.text.DecimalFormatSymbols; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -69,9 +69,12 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -84,9 +87,6 @@ import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; public final class Utils { @@ -1675,4 +1675,11 @@ public static Map entriesWithPrefix(Map map, String pr return result; } + /** + * A runnable that can throw checked exception. + */ + @FunctionalInterface + public interface ThrowingRunnable { + void run() throws Exception; + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java new file mode 100644 index 0000000000000..7074542c3b030 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventProcessorTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent; +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ApplicationEventProcessorTest { + private ApplicationEventProcessor processor; + private BlockingQueue applicationEventQueue = mock(BlockingQueue.class); + private RequestManagers requestManagers; + private ConsumerMetadata metadata = mock(ConsumerMetadata.class); + private NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); + private OffsetsRequestManager offsetRequestManager; + private OffsetsRequestManager offsetsRequestManager; + private TopicMetadataRequestManager topicMetadataRequestManager; + private FetchRequestManager fetchRequestManager; + private CoordinatorRequestManager coordinatorRequestManager; + private CommitRequestManager commitRequestManager; + private HeartbeatRequestManager heartbeatRequestManager; + private MembershipManager membershipManager; + + @BeforeEach + @SuppressWarnings("unchecked") + public void setup() { + LogContext logContext = new LogContext(); + offsetRequestManager = mock(OffsetsRequestManager.class); + offsetsRequestManager = mock(OffsetsRequestManager.class); + topicMetadataRequestManager = mock(TopicMetadataRequestManager.class); + fetchRequestManager = mock(FetchRequestManager.class); + coordinatorRequestManager = mock(CoordinatorRequestManager.class); + commitRequestManager = mock(CommitRequestManager.class); + heartbeatRequestManager = mock(HeartbeatRequestManager.class); + membershipManager = mock(MembershipManager.class); + requestManagers = new RequestManagers( + logContext, + offsetsRequestManager, + topicMetadataRequestManager, + fetchRequestManager, + Optional.of(coordinatorRequestManager), + Optional.of(commitRequestManager), + Optional.of(heartbeatRequestManager)); + processor = new ApplicationEventProcessor( + new LogContext(), + applicationEventQueue, + requestManagers, + metadata + ); + } + + @Test + public void testPrepClosingCommitEvents() { + List results = mockCommitResults(); + doReturn(new NetworkClientDelegate.PollResult(100, results)).when(commitRequestManager).pollOnClose(); + processor.process(new CommitOnCloseApplicationEvent()); + verify(commitRequestManager).signalClose(); + } + + @Test + public void testPrepClosingLeaveGroupEvent() { + LeaveOnCloseApplicationEvent event = new LeaveOnCloseApplicationEvent(); + when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); + when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null)); + processor.process(event); + verify(membershipManager).leaveGroup(); + assertTrue(event.future().isDone()); + } + + private List mockCommitResults() { + return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 19734698afba9..ac052d72f6ffb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.Metadata.LeaderAndEpoch; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -26,11 +27,14 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.clients.consumer.RoundRobinAssignor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; @@ -38,6 +42,7 @@ import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent; +import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; @@ -45,6 +50,7 @@ import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidGroupIdException; @@ -52,10 +58,12 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; @@ -85,10 +93,12 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.Arrays.asList; +import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED; @@ -106,6 +116,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -118,6 +129,10 @@ @SuppressWarnings("unchecked") public class AsyncKafkaConsumerTest { + private long retryBackoffMs = 100L; + private int defaultApiTimeoutMs = 1000; + private boolean autoCommitEnabled = true; + private AsyncKafkaConsumer consumer = null; private final Time time = new MockTime(1); @@ -130,7 +145,7 @@ public class AsyncKafkaConsumerTest { public void resetAll() { backgroundEventQueue.clear(); if (consumer != null) { - consumer.close(); + consumer.close(Duration.ZERO); } consumer = null; Mockito.framework().clearInlineMocks(); @@ -169,6 +184,35 @@ private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { ); } + private AsyncKafkaConsumer newConsumer( + FetchBuffer fetchBuffer, + ConsumerInterceptors interceptors, + ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, + SubscriptionState subscriptions, + List assignors, + String groupId, + String clientId) { + return new AsyncKafkaConsumer<>( + new LogContext(), + clientId, + new Deserializers<>(new StringDeserializer(), new StringDeserializer()), + fetchBuffer, + fetchCollector, + interceptors, + time, + applicationEventHandler, + backgroundEventQueue, + rebalanceListenerInvoker, + new Metrics(), + subscriptions, + metadata, + retryBackoffMs, + defaultApiTimeoutMs, + assignors, + groupId, + autoCommitEnabled); + } + @Test public void testSuccessfulStartupShutdown() { consumer = newConsumer(); @@ -294,11 +338,12 @@ public void testCommittedLeaderEpochUpdate() { public void testCommittedExceptionThrown() { consumer = newConsumer(); Map offsets = mockTopicPartitionOffset(); - when(applicationEventHandler.addAndGet(any(), any())).thenAnswer(invocation -> { - CompletableApplicationEvent event = invocation.getArgument(0); - assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, event); - throw new KafkaException("Test exception"); - }); + when(applicationEventHandler.addAndGet( + any(FetchCommittedOffsetsApplicationEvent.class), any())).thenAnswer(invocation -> { + CompletableApplicationEvent event = invocation.getArgument(0); + assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, event); + throw new KafkaException("Test exception"); + }); assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); } @@ -340,7 +385,6 @@ public void testWakeupAfterEmptyFetch() { assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } - @Test public void testWakeupAfterNonEmptyFetch() { consumer = newConsumer(); final String topicName = "foo"; @@ -447,16 +491,31 @@ public void testCommitSyncLeaderEpochUpdate() { @Test public void testCommitAsyncLeaderEpochUpdate() { - consumer = newConsumer(); - MockCommitCallback callback = new MockCommitCallback(); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + new ConsumerInterceptors<>(Collections.emptyList()), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); HashMap topicPartitionOffsets = new HashMap<>(); topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), "")); topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), "")); - + when(metadata.currentLeader(t0)).thenReturn( + new LeaderAndEpoch(Optional.of( + new Node(1, "host", 9000)), Optional.of(1))); + when(metadata.currentLeader(t1)).thenReturn( + new LeaderAndEpoch(Optional.of( + new Node(1, "host", 9000)), Optional.of(1))); consumer.assign(Arrays.asList(t0, t1)); + consumer.seek(t0, 10); + consumer.seek(t1, 20); + MockCommitCallback callback = new MockCommitCallback(); assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, callback)); verify(metadata).updateLastSeenEpochIfNewer(t0, 2); @@ -490,6 +549,111 @@ public void testEnsureShutdownExecutedCommitAsyncCallbacks() { null); } + @Test + public void testVerifyApplicationEventOnShutdown() { + consumer = newConsumer(); + doReturn(null).when(applicationEventHandler).addAndGet(any(), any()); + consumer.close(); + verify(applicationEventHandler).addAndGet(any(LeaveOnCloseApplicationEvent.class), any()); + verify(applicationEventHandler).add(any(CommitOnCloseApplicationEvent.class)); + } + + @Test + public void testPartitionRevocationOnClose() { + MockRebalanceListener listener = new MockRebalanceListener(); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); + + consumer.subscribe(singleton("topic"), listener); + subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + consumer.close(Duration.ZERO); + assertTrue(subscriptions.assignedPartitions().isEmpty()); + assertEquals(1, listener.revokedCount); + } + + @Test + public void testFailedPartitionRevocationOnClose() { + // If rebalance listener failed to execute during close, we will skip sending leave group and proceed with + // closing the consumer. + ConsumerRebalanceListener listener = mock(ConsumerRebalanceListener.class); + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + new ConsumerInterceptors<>(Collections.emptyList()), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); + subscriptions.subscribe(singleton("topic"), Optional.of(listener)); + TopicPartition tp = new TopicPartition("topic", 0); + subscriptions.assignFromSubscribed(singleton(tp)); + doThrow(new KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp))); + assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO)); + verify(applicationEventHandler, never()).addAndGet(any(LeaveOnCloseApplicationEvent.class), any()); + verify(listener).onPartitionsRevoked(eq(singleton(tp))); + assertEquals(emptySet(), subscriptions.assignedPartitions()); + } + + @Test + public void testCompleteQuietly() { + AtomicReference exception = new AtomicReference<>(); + CompletableFuture future = CompletableFuture.completedFuture(null); + consumer = newConsumer(); + assertDoesNotThrow(() -> consumer.completeQuietly(() -> { + future.get(0, TimeUnit.MILLISECONDS); + }, "test", exception)); + assertNull(exception.get()); + + assertDoesNotThrow(() -> consumer.completeQuietly(() -> { + throw new KafkaException("Test exception"); + }, "test", exception)); + assertTrue(exception.get() instanceof KafkaException); + } + + @Test + public void testAutoCommitSyncEnabled() { + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); + consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); + subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + subscriptions.seek(new TopicPartition("topic", 0), 100); + consumer.maybeAutoCommitSync(true, time.timer(100), null); + verify(applicationEventHandler).add(any(CommitApplicationEvent.class)); + } + + @Test + public void testAutoCommitSyncDisabled() { + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions, + singletonList(new RoundRobinAssignor()), + "group-id", + "client-id"); + consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); + subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + subscriptions.seek(new TopicPartition("topic", 0), 100); + consumer.maybeAutoCommitSync(false, time.timer(100), null); + verify(applicationEventHandler, never()).add(any(CommitApplicationEvent.class)); + } + private void assertMockCommitCallbackInvoked(final Executable task, final MockCommitCallback callback, final Errors errors) { @@ -583,7 +747,9 @@ public void testBeginningOffsetsThrowsKafkaExceptionForUnderlyingExecutionFailur Set partitions = mockTopicPartitionOffset().keySet(); Throwable eventProcessingFailure = new KafkaException("Unexpected failure " + "processing List Offsets event"); - doThrow(eventProcessingFailure).when(applicationEventHandler).addAndGet(any(), any()); + doThrow(eventProcessingFailure).when(applicationEventHandler).addAndGet( + any(ListOffsetsApplicationEvent.class), + any()); Throwable consumerError = assertThrows(KafkaException.class, () -> consumer.beginningOffsets(partitions, Duration.ofMillis(1))); @@ -657,7 +823,7 @@ public void testOffsetsForTimesWithZeroTimeout() { Map result = assertDoesNotThrow(() -> consumer.offsetsForTimes(timestampToSearch, - Duration.ofMillis(0))); + Duration.ZERO)); assertEquals(expectedResult, result); verify(applicationEventHandler, never()).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); @@ -931,7 +1097,6 @@ public void testBackgroundError() { final ErrorBackgroundEvent errorBackgroundEvent = new ErrorBackgroundEvent(expectedException); backgroundEventQueue.add(errorBackgroundEvent); consumer.assign(singletonList(new TopicPartition("topic", 0))); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); assertEquals(expectedException.getMessage(), exception.getMessage()); @@ -950,7 +1115,6 @@ public void testMultipleBackgroundErrors() { final ErrorBackgroundEvent errorBackgroundEvent2 = new ErrorBackgroundEvent(expectedException2); backgroundEventQueue.add(errorBackgroundEvent2); consumer.assign(singletonList(new TopicPartition("topic", 0))); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); assertEquals(expectedException1.getMessage(), exception.getMessage()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index 74d69d8d42aeb..f8cecba8f99c7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -458,6 +459,22 @@ public void testOffsetFetchRequest_PartitionDataError(final Errors error, final testNonRetriable(Collections.singletonList(future)); } + @Test + public void testSignalClose() { + CommitRequestManager commitRequestManger = create(true, 100); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode)); + + Map offsets = Collections.singletonMap(new TopicPartition("topic", 1), + new OffsetAndMetadata(0)); + + commitRequestManger.addOffsetCommitRequest(offsets); + commitRequestManger.signalClose(); + NetworkClientDelegate.PollResult res = commitRequestManger.poll(time.milliseconds()); + assertEquals(1, res.unsentRequests.size()); + OffsetCommitRequestData data = (OffsetCommitRequestData) res.unsentRequests.get(0).requestBuilder().build().data(); + assertEquals("topic", data.topics().get(0).name()); + } + private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManger) { assertTrue(commitRequestManger.pendingRequests.inflightOffsetFetches.isEmpty()); assertTrue(commitRequestManger.pendingRequests.unsentOffsetFetches.isEmpty()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 0eefb59604f3a..e6bca05d8a151 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -55,8 +55,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import static java.util.Collections.singleton; -import static java.util.Collections.singletonMap; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; @@ -85,6 +83,8 @@ public class ConsumerNetworkThreadTest { private OffsetsRequestManager offsetsRequestManager; private CommitRequestManager commitRequestManager; private CoordinatorRequestManager coordinatorRequestManager; + private HeartbeatRequestManager heartbeatRequestManager; + private MembershipManager memberhipsManager; private ConsumerNetworkThread consumerNetworkThread; private MockClient client; private SubscriptionState subscriptions; @@ -101,6 +101,8 @@ public void setup() { commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); offsetsRequestManager = testBuilder.offsetsRequestManager; coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); + heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); + memberhipsManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); consumerNetworkThread = testBuilder.consumerNetworkThread; subscriptions = testBuilder.subscriptions; consumerNetworkThread.initializeResources(); @@ -125,7 +127,6 @@ public void testStartupAndTearDown() throws InterruptedException { TestUtils.waitForCondition(isStarted, "The consumer network thread did not start within " + DEFAULT_MAX_WAIT_MS + " ms"); - prepareTearDown(); consumerNetworkThread.close(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); TestUtils.waitForCondition(isClosed, @@ -285,44 +286,6 @@ void testEnsureEventsAreCompleted() { assertTrue(applicationEventsQueue.isEmpty()); } - @Test - void testCoordinatorConnectionOnClose() { - TopicPartition tp = new TopicPartition("topic", 0); - subscriptions.assignFromUser(singleton(new TopicPartition("topic", 0))); - subscriptions.seekUnvalidated(tp, new SubscriptionState.FetchPosition(100)); - Node node = metadata.fetch().nodes().get(0); - coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(singletonMap(tp, 100L), Errors.NONE, false); - consumerNetworkThread.cleanup(); - assertTrue(coordinatorRequestManager.coordinator().isPresent()); - assertFalse(client.hasPendingResponses()); - assertFalse(client.hasInFlightRequests()); - } - - @Test - void testAutoCommitOnClose() { - TopicPartition tp = new TopicPartition("topic", 0); - Node node = metadata.fetch().nodes().get(0); - subscriptions.assignFromUser(singleton(tp)); - subscriptions.seek(tp, 100); - coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(singletonMap(tp, 100L), Errors.NONE, false); - consumerNetworkThread.maybeAutocommitOnClose(time.timer(1000)); - assertTrue(coordinatorRequestManager.coordinator().isPresent()); - verify(commitRequestManager).createCommitAllConsumedRequest(); - - assertFalse(client.hasPendingResponses()); - assertFalse(client.hasInFlightRequests()); - } - - private void prepareTearDown() { - Node node = metadata.fetch().nodes().get(0); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false); - } - private void prepareOffsetCommitRequest(final Map expectedOffsets, final Errors error, final boolean disconnected) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index f0d42a8ed3b2c..eb3cdd0926e7f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Timer; import java.io.Closeable; +import java.time.Duration; import java.util.HashMap; import java.util.Optional; import java.util.Properties; @@ -260,7 +261,8 @@ public ConsumerTestBuilder(Optional groupInfo, boolean enableA logContext, applicationEventQueue, requestManagers, - metadata) + metadata + ) ); ConsumerCoordinatorMetrics consumerCoordinatorMetrics = new ConsumerCoordinatorMetrics( subscriptions, @@ -302,7 +304,7 @@ public ConsumerNetworkThreadTestBuilder(Optional groupInfo) { @Override public void close() { - consumerNetworkThread.close(); + consumerNetworkThread.close(Duration.ZERO); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index e080f63c1a81e..c3da9157acfa3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -315,7 +315,8 @@ public void testFetcherCloseClosesFetchSessionsInBroker() { // the close() method with a Timer will NOT send out the close session requests on close. The network // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we need to run that logic here. ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, timer); - + // the network is polled during the last state of clean up. + networkClientDelegate.poll(time.timer(1)); // validate that closing the fetcher has sent a request with final epoch. 2 requests are sent, one for the // normal fetch earlier and another for the finish fetch here. verify(networkClientDelegate, times(2)).doSend(argument.capture(), any(Long.class)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index 88058d1d54132..a75b613a17ae8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -1403,6 +1403,12 @@ public void testHeartbeatSentOnStaledMember() { assertEquals(MemberState.JOINING, membershipManager.state()); } + private void dropAssignedPartitions() { + SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptionState.assignedPartitions()); + subscriptionState.assignFromSubscribed(Collections.emptySet()); + } + private MembershipManagerImpl mockMemberSuccessfullyReceivesAndAcksAssignment( Uuid topicId, String topicName, List partitions) { MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index b3f1f0c458f7d..01e9aec257878 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -533,7 +533,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly")) def testExpandingTopicSubscriptions(quorum: String, groupProtocol: String): Unit = { val otherTopic = "other" val initialAssignment = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) @@ -548,7 +548,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly")) def testShrinkingTopicSubscriptions(quorum: String, groupProtocol: String): Unit = { val otherTopic = "other" createTopic(otherTopic, 2, brokerCount)