diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 0e93b5f6e21c8..918a170598f4a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -79,6 +79,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.slf4j.Logger; +import org.slf4j.event.Level; import java.net.InetSocketAddress; import java.time.Duration; @@ -93,10 +94,13 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -119,6 +123,7 @@ import static org.apache.kafka.common.utils.Utils.closeQuietly; import static org.apache.kafka.common.utils.Utils.isBlank; import static org.apache.kafka.common.utils.Utils.join; +import static org.apache.kafka.common.utils.Utils.swallow; /** * This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process @@ -210,7 +215,7 @@ private void process(final GroupMetadataUpdateEvent event) { private final ApplicationEventHandler applicationEventHandler; private final Time time; - private Optional groupMetadata; + private Optional groupMetadata = Optional.empty(); private final KafkaConsumerMetrics kafkaConsumerMetrics; private Logger log; private final String clientId; @@ -233,6 +238,7 @@ private void process(final GroupMetadataUpdateEvent event) { private final Metrics metrics; private final long retryBackoffMs; private final int defaultApiTimeoutMs; + private final boolean autoCommitEnabled; private volatile boolean closed = false; private final List assignors; private final Optional clientTelemetryReporter; @@ -264,6 +270,7 @@ private void process(final GroupMetadataUpdateEvent event) { GroupRebalanceConfig.ProtocolType.CONSUMER ); this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); + this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); LogContext logContext = createLogContext(config, groupRebalanceConfig); this.log = logContext.logger(getClass()); @@ -378,7 +385,8 @@ private void process(final GroupMetadataUpdateEvent event) { long retryBackoffMs, int defaultApiTimeoutMs, List assignors, - String groupId) { + String groupId, + boolean autoCommitEnabled) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = clientId; @@ -398,6 +406,7 @@ private void process(final GroupMetadataUpdateEvent event) { this.assignors = assignors; this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); this.clientTelemetryReporter = Optional.empty(); + this.autoCommitEnabled = autoCommitEnabled; } // Visible for testing @@ -413,6 +422,7 @@ private void process(final GroupMetadataUpdateEvent event) { this.log = logContext.logger(getClass()); this.subscriptions = subscriptions; this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); + this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); this.fetchBuffer = new FetchBuffer(logContext); this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; this.interceptors = new ConsumerInterceptors<>(Collections.emptyList()); @@ -1024,15 +1034,14 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - + // Prepare shutting down the network thread + swallow(log, Level.ERROR, "Unexpected exception when preparing for shutdown", () -> prepareShutdown(closeTimer), firstException); + closeTimer.update(); if (applicationEventHandler != null) - closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); - - // Invoke all callbacks after the background thread exists in case if there are unsent async - // commits - maybeInvokeCommitCallbacks(); - - closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); + closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); + closeTimer.update(); + // Ensure all async commit callbacks are invoked + swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback", this::maybeInvokeCommitCallbacks, firstException); closeQuietly(interceptors, "consumer interceptors", firstException); closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); closeQuietly(metrics, "consumer metrics", firstException); @@ -1050,6 +1059,57 @@ private void close(Duration timeout, boolean swallowException) { } } + /** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ + private void prepareShutdown(final Timer timer) { + if (!groupMetadata.isPresent()) + return; + + maybeAutoCommitSync(timer); + timer.update(); + if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) + return; + + try { + // If the consumer is in a group, we will pause and revoke all assigned partitions + onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS); + timer.update(); + } catch (Exception e) { + Exception exception = e; + if (e instanceof ExecutionException) + exception = (Exception) e.getCause(); + throw new KafkaException("User rebalance callback throws an error", exception); + } finally { + subscriptions.assignFromSubscribed(Collections.emptySet()); + } + } + + private void maybeAutoCommitSync(final Timer timer) { + if (autoCommitEnabled) { + Map allConsumed = subscriptions.allConsumed(); + try { + log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); + } catch (Exception e) { + // consistent with async auto-commit failures, we do not propagate the exception + log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); + } + } + } + + private CompletableFuture onLeavePrepare() { + SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + // TODO: Invoke rebalanceListener via KAFKA-15276 + return CompletableFuture.completedFuture(null); + } + @Override public void wakeup() { wakeupTrigger.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 07bb9811b5b6f..f0520230488a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -62,6 +62,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { private ApplicationEventProcessor applicationEventProcessor; private NetworkClientDelegate networkClientDelegate; private RequestManagers requestManagers; + private Optional membershipManager; private volatile boolean running; private final IdempotentCloser closer = new IdempotentCloser(); private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS); @@ -106,6 +107,7 @@ void initializeResources() { applicationEventProcessor = applicationEventProcessorSupplier.get(); networkClientDelegate = networkClientDelegateSupplier.get(); requestManagers = requestManagersSupplier.get(); + membershipManager = requestManagersSupplier.get().membershipManager; } /** @@ -143,7 +145,6 @@ void runOnce() { .map(networkClientDelegate::addAll) .reduce(MAX_POLL_TIMEOUT_MS, Math::min); networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs); - cachedMaximumTimeToWait = requestManagers.entries().stream() .filter(Optional::isPresent) .map(Optional::get) @@ -194,11 +195,11 @@ static void runAtClose(final Collection> requ // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until // all requests have received a response. - do { + while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)) { pollWaitTimeMs = Math.min(pollWaitTimeMs, timer.remainingMs()); networkClientDelegate.poll(pollWaitTimeMs, timer.currentTimeMs()); timer.update(); - } while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)); + } } public boolean isRunning() { @@ -274,15 +275,17 @@ private void closeInternal(final Duration timeout) { } void cleanup() { + log.trace("Closing the consumer network thread"); + Timer timer = time.timer(closeTimeout); try { - log.trace("Closing the consumer network thread"); - Timer timer = time.timer(closeTimeout); - maybeAutocommitOnClose(timer); + prepareForShutdown(timer); + // Send out the unsent commits and try to complete them before timer runs out runAtClose(requestManagers.entries(), networkClientDelegate, timer); - maybeLeaveGroup(timer); + maybeLeaveGroup(); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { + networkClientDelegate.awaitPendingRequests(timer); closeQuietly(requestManagers, "request managers"); closeQuietly(networkClientDelegate, "network client delegate"); closeQuietly(applicationEventProcessor, "application event processor"); @@ -290,39 +293,40 @@ void cleanup() { } } - /** - * We need to autocommit before shutting down the consumer. The method needs to first connect to the coordinator - * node to construct the closing requests. Then wait for all closing requests to finish before returning. The - * method is bounded by a closing timer. We will continue closing down the consumer if the requests cannot be - * completed in time. - */ - // Visible for testing - void maybeAutocommitOnClose(final Timer timer) { + private void prepareForShutdown(final Timer timer) { if (!requestManagers.coordinatorRequestManager.isPresent()) return; - if (!requestManagers.commitRequestManager.isPresent()) { log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); return; } - if (!requestManagers.commitRequestManager.get().canAutoCommit()) { - return; - } - + // We need a coordinator node for the commit request manager to send the remaining commit requests ensureCoordinatorReady(timer); - NetworkClientDelegate.UnsentRequest autocommitRequest = - requestManagers.commitRequestManager.get().createCommitAllConsumedRequest(); - networkClientDelegate.add(autocommitRequest); - do { - long currentTimeMs = timer.currentTimeMs(); - ensureCoordinatorReady(timer); - networkClientDelegate.poll(timer.remainingMs(), currentTimeMs); - } while (timer.notExpired() && !autocommitRequest.future().isDone()); } - void maybeLeaveGroup(final Timer timer) { - // TODO: Leave group upon closing the consumer + /** + * Leave the group when the consumer is shutting down. + */ + void maybeLeaveGroup() { + if (!membershipManager.isPresent()) + return; + + // We do not need to send leave group when the member is unsubscribed or fatal + if (membershipManager.get().shouldSkipHeartbeat()) + return; + + membershipManager.get().leaveGroupOnClose(); + HeartbeatRequestManager hrm = requestManagers.heartbeatRequestManager.orElseThrow(() -> + new IllegalStateException( + "Expecting a GroupHeartbeatRequest but the object was never initialized.")); + log.debug("Sending GroupHeartbeatRequest with epoch {} to coordinator {} to leave the group before close", + membershipManager.get().memberEpoch(), + requestManagers.coordinatorRequestManager.get().coordinator()); + long nowMs = time.milliseconds(); + // Ensure the request gets sent out + networkClientDelegate.addAll(hrm.poll(nowMs)); + networkClientDelegate.poll(0, nowMs); } private void ensureCoordinatorReady(final Timer timer) { @@ -331,7 +335,7 @@ private void ensureCoordinatorReady(final Timer timer) { } } - private boolean coordinatorReady() { + boolean coordinatorReady() { CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.orElseThrow( () -> new IllegalStateException("CoordinatorRequestManager uninitialized.")); Optional coordinator = coordinatorRequestManager.coordinator(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java index 4727daa0f64b8..0513ce1f244ae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java @@ -117,6 +117,11 @@ public interface MembershipManager { */ CompletableFuture leaveGroup(); + /** + * Leaving the group when the user closes the consumer. + */ + void leaveGroupOnClose(); + /** * @return True if the member should send heartbeat to the coordinator without waiting for * the interval. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java index c6393bbeefcd4..8d80a1fd20547 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java @@ -104,14 +104,14 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource /** * TopicPartition comparator based on topic name and partition id. */ - private final static TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = + final static TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new TopicPartitionComparator(); /** * TopicIdPartition comparator based on topic name and partition id (ignoring ID while sorting, * as this is sorted mainly for logging purposes). */ - private final static TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = + final static TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = new TopicIdPartitionComparator(); /** @@ -481,6 +481,28 @@ public CompletableFuture leaveGroup() { return leaveResult; } + /** + * When closing down the consumer. The subscriptions are reset by the application thread therefore we just need to + * transition the state to LEAVING and set the epoch to -1/-2. + */ + @Override + public void leaveGroupOnClose() { + if (state == MemberState.UNSUBSCRIBED || + state == MemberState.FATAL || + state == MemberState.LEAVING) { + return; + } + + if (state == MemberState.PREPARE_LEAVING) { + transitionToSendingLeaveGroup(); + return; + } + + transitionTo(MemberState.PREPARE_LEAVING); + transitionToSendingLeaveGroup(); + leaveGroupInProgress = Optional.of(CompletableFuture.completedFuture(null)); + } + /** * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or * onPartitionsLost. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 141f5f955c8b5..10e0b345dcb71 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -64,10 +64,10 @@ public class NetworkClientDelegate implements AutoCloseable { private final long retryBackoffMs; public NetworkClientDelegate( - final Time time, - final ConsumerConfig config, - final LogContext logContext, - final KafkaClient client) { + final Time time, + final ConsumerConfig config, + final LogContext logContext, + final KafkaClient client) { this.time = time; this.client = client; this.log = logContext.logger(getClass()); @@ -130,6 +130,21 @@ public void poll(final long timeoutMs, final long currentTimeMs) { checkDisconnects(currentTimeMs); } + /** + * Block until all pending requests from the given node have finished. + */ + public void awaitPendingRequests(Timer timer) { + while (!unsentRequests().isEmpty() && timer.notExpired()) { + poll(timer.remainingMs(), timer.currentTimeMs()); + timer.update(); + } + + if (!unsentRequests.isEmpty()) { + log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", + unsentRequests.size()); + } + } + /** * Tries to send the requests in the unsentRequest queue. If the request doesn't have an assigned node, it will * find the leastLoadedOne, and will be retried in the next {@code poll()}. If the request is expired, a @@ -310,11 +325,11 @@ Optional node() { @Override public String toString() { return "UnsentRequest{" + - "requestBuilder=" + requestBuilder + - ", handler=" + handler + - ", node=" + node + - ", timer=" + timer + - '}'; + "requestBuilder=" + requestBuilder + + ", handler=" + handler + + ", node=" + node + + ", timer=" + timer + + '}'; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index d39b63e979022..1da593446718a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import java.util.Locale; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -81,6 +80,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -97,6 +97,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -129,8 +130,8 @@ public class AsyncKafkaConsumerTest { @BeforeEach public void setup() { - // By default, the consumer is part of a group and autoCommit is enabled. - setup(ConsumerTestBuilder.createDefaultGroupInformation(), true); + // By default, the consumer is part of a group and autoCommit is disabled. + setup(ConsumerTestBuilder.createDefaultGroupInformation(), false); } private void setup(Optional groupInfo, boolean enableAutoCommit) { @@ -164,14 +165,10 @@ private void resetWithAutoCommitEnabled() { setup(ConsumerTestBuilder.createDefaultGroupInformation(), true); } - @Test - public void testSuccessfulStartupShutdown() { - assertDoesNotThrow(() -> consumer.close()); - } - @Test public void testSuccessfulStartupShutdownWithAutoCommit() { resetWithAutoCommitEnabled(); + prepareCoordinatorResponse(DEFAULT_GROUP_ID, Errors.NONE); TopicPartition tp = new TopicPartition("topic", 0); consumer.assign(singleton(tp)); consumer.seek(tp, 100); @@ -187,6 +184,7 @@ public void testInvalidGroupId() { @Test public void testFailOnClosedConsumer() { + prepareCoordinatorResponse(DEFAULT_GROUP_ID, Errors.NONE); consumer.close(); final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::assignment); assertEquals("This consumer has already been closed.", res.getMessage()); @@ -325,7 +323,6 @@ public void testWakeupAfterEmptyFetch() { assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } - @Test public void testWakeupAfterNonEmptyFetch() { final String topicName = "foo"; final int partition = 3; @@ -427,6 +424,7 @@ public void testCommitSyncLeaderEpochUpdate() { CompletableFuture commitFuture = new CompletableFuture<>(); commitFuture.complete(null); + prepareCommit(Arrays.asList(t1, t0), DEFAULT_GROUP_ID, Errors.NONE); try (MockedConstruction ignored = commitEventMocker(commitFuture)) { assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets)); } @@ -445,10 +443,15 @@ public void testCommitAsyncLeaderEpochUpdate() { topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), "")); consumer.assign(Arrays.asList(t0, t1)); + consumer.seek(t0, 10); + consumer.seek(t1, 20); CompletableFuture commitFuture = new CompletableFuture<>(); commitFuture.complete(null); + prepareCommit(Arrays.asList(t1, t0), DEFAULT_GROUP_ID, Errors.NONE); + // TODO: The log shows NPE thrown from the CommitRequestManager, which is caused by the use of mock. + // Ideally, we will need to abandon the use of spy() in the testBuilder and mock the dependencies instead. try (MockedConstruction ignored = commitEventMocker(commitFuture)) { assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, callback)); } @@ -474,6 +477,8 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() { public void testEnsureShutdownExecutedCommitAsyncCallbacks() { MockCommitCallback callback = new MockCommitCallback(); CompletableFuture future = new CompletableFuture<>(); + testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, DEFAULT_GROUP_ID, + testBuilder.metadata.fetch().nodes().get(0))); doReturn(future).when(consumer).commit(new HashMap<>(), false); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); future.complete(null); @@ -953,17 +958,23 @@ public void testGroupIdNull() { @Test public void testGroupIdNotNullAndValid() { - final Properties props = requiredConsumerPropertiesAndGroupId("consumerGroupA"); + // close the default consumer + shutDown(); + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); final ConsumerConfig config = new ConsumerConfig(props); - - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + try { + AsyncKafkaConsumer consumer = + new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()); assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); } catch (final Exception exception) { throw new AssertionFailedError("The following exception was not expected:", exception); + } finally { + // We need 0ms shutdown to avoid leaving the test hanging for the default duration + consumer.close(Duration.ofMillis(0)); } } @@ -1113,21 +1124,9 @@ private HashMap mockTimestampToSearch() { return timestampToSearch; } - private void prepAutocommitOnClose() { - Node node = testBuilder.metadata.fetch().nodes().get(0); - testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - if (!testBuilder.subscriptions.allConsumed().isEmpty()) { - List topicPartitions = new ArrayList<>(testBuilder.subscriptions.assignedPartitionsList()); - testBuilder.client.prepareResponse(mockAutocommitResponse( - topicPartitions, - (short) 1, - Errors.NONE).responseBody()); - } - } - - private ClientResponse mockAutocommitResponse(final List topicPartitions, - final short apiKeyVersion, - final Errors error) { + private ClientResponse mockCommitResponse(final List topicPartitions, + final short apiKeyVersion, + final Errors error) { OffsetCommitResponseData responseData = new OffsetCommitResponseData(); List responseTopics = new ArrayList<>(); topicPartitions.forEach(tp -> { @@ -1153,5 +1152,33 @@ private ClientResponse mockAutocommitResponse(final List topicPa new OffsetCommitResponse(responseData) ); } + + private void prepareCommit(List topicPartitions, String groupId, Errors error) { + testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(error, groupId, + testBuilder.metadata.fetch().nodes().get(0))); + testBuilder.client.prepareResponse(mockCommitResponse( + topicPartitions, + ApiKeys.OFFSET_COMMIT.latestVersion(), + Errors.NONE).responseBody()); + } + + private void prepareCoordinatorResponse(String groupId, Errors error) { + Node node = testBuilder.metadata.fetch().nodes().get(0); + testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(error, groupId, + node)); + } + + + private void prepAutocommitOnClose() { + Node node = testBuilder.metadata.fetch().nodes().get(0); + testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, DEFAULT_GROUP_ID, + node)); + if (!testBuilder.subscriptions.allConsumed().isEmpty()) { + doAnswer(i -> { + System.out.println("auto commit sync on close"); + return null; + }).when(consumer).commitSync(any(), any()); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index a1370918e4dcb..42b7682b0d70f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -56,7 +56,7 @@ import java.util.concurrent.CompletableFuture; import static java.util.Collections.singleton; -import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REQUEST_TIMEOUT_MS; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; @@ -67,6 +67,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -85,6 +86,8 @@ public class ConsumerNetworkThreadTest { private OffsetsRequestManager offsetsRequestManager; private CommitRequestManager commitRequestManager; private CoordinatorRequestManager coordinatorRequestManager; + private HeartbeatRequestManager heartbeatRequestManager; + private MembershipManager memberhipsManager; private ConsumerNetworkThread consumerNetworkThread; private MockClient client; private SubscriptionState subscriptions; @@ -101,6 +104,8 @@ public void setup() { commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); offsetsRequestManager = testBuilder.offsetsRequestManager; coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); + heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); + memberhipsManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); consumerNetworkThread = testBuilder.consumerNetworkThread; subscriptions = testBuilder.subscriptions; consumerNetworkThread.initializeResources(); @@ -293,28 +298,25 @@ void testCoordinatorConnectionOnClose() { Node node = metadata.fetch().nodes().get(0); coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(singletonMap(tp, 100L), Errors.NONE, false); consumerNetworkThread.cleanup(); assertTrue(coordinatorRequestManager.coordinator().isPresent()); - assertFalse(client.hasPendingResponses()); - assertFalse(client.hasInFlightRequests()); + assertFalse(client.hasPendingResponses(), + "There should be 0 pending response but found " + client.futureResponses().size()); + assertFalse(client.hasInFlightRequests(), + "There should be 0 pending request, but found " + client.requests().size()); } @Test - void testAutoCommitOnClose() { - TopicPartition tp = new TopicPartition("topic", 0); + void testEnsurePollHeartbeatOnClose() { + when(memberhipsManager.shouldSkipHeartbeat()).thenReturn(false); Node node = metadata.fetch().nodes().get(0); - subscriptions.assignFromUser(singleton(tp)); - subscriptions.seek(tp, 100); - coordinatorRequestManager.markCoordinatorUnknown("test", time.milliseconds()); - client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - prepareOffsetCommitRequest(singletonMap(tp, 100L), Errors.NONE, false); - consumerNetworkThread.maybeAutocommitOnClose(time.timer(1000)); - assertTrue(coordinatorRequestManager.coordinator().isPresent()); - verify(commitRequestManager).createCommitAllConsumedRequest(); - - assertFalse(client.hasPendingResponses()); - assertFalse(client.hasInFlightRequests()); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, DEFAULT_GROUP_ID, node)); + consumerNetworkThread.cleanup(); + doAnswer(invocation -> { + System.out.println("leave group on close"); + return null; + }).when(memberhipsManager).leaveGroupOnClose(); + verify(heartbeatRequestManager).poll(anyLong()); } private void prepareTearDown() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 97d53dead8534..2346427cf429a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -328,34 +328,35 @@ public AsyncKafkaConsumerTestBuilder(Optional groupInfo, boole super(groupInfo, enableAutoCommit, enableAutoTick); String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); List assignors = ConsumerPartitionAssignor.getAssignorInstances( - config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), - config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) + config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), + config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) ); Deserializers deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer()); this.fetchCollector = spy(new FetchCollector<>(logContext, - metadata, - subscriptions, - fetchConfig, - deserializers, - metricsManager, - time)); + metadata, + subscriptions, + fetchConfig, + deserializers, + metricsManager, + time)); this.consumer = spy(new AsyncKafkaConsumer<>( - logContext, - clientId, - deserializers, - new FetchBuffer(logContext), - fetchCollector, - new ConsumerInterceptors<>(Collections.emptyList()), - time, - applicationEventHandler, - backgroundEventQueue, - metrics, - subscriptions, - metadata, - retryBackoffMs, - 60000, - assignors, - groupInfo.map(groupInformation -> groupInformation.groupState.groupId).orElse(null))); + logContext, + clientId, + deserializers, + new FetchBuffer(logContext), + fetchCollector, + new ConsumerInterceptors<>(Collections.emptyList()), + time, + applicationEventHandler, + backgroundEventQueue, + metrics, + subscriptions, + metadata, + retryBackoffMs, + 60000, + assignors, + groupInfo.map(groupInformation -> groupInformation.groupState.groupId).orElse(null), + enableAutoCommit)); } @Override