diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index e6de169f7b33f..4a4012f4dddae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1234,7 +1234,7 @@ private void close(Duration timeout, boolean swallowException) { clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); // Prepare shutting down the network thread - prepareShutdown(closeTimer, firstException); + releaseAssignmentAndLeaveGroup(closeTimer, firstException); closeTimer.update(); swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", () -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException); @@ -1270,12 +1270,12 @@ private void close(Duration timeout, boolean swallowException) { * 2. revoke all partitions * 3. if partition revocation completes successfully, send leave group */ - void prepareShutdown(final Timer timer, final AtomicReference firstException) { + void releaseAssignmentAndLeaveGroup(final Timer timer, final AtomicReference firstException) { if (!groupMetadata.get().isPresent()) return; if (autoCommitEnabled) - autoCommitSync(timer); + commitSyncAllConsumed(timer); applicationEventHandler.add(new CommitOnCloseEvent()); completeQuietly( @@ -1287,7 +1287,7 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx } // Visible for testing - void autoCommitSync(final Timer timer) { + void commitSyncAllConsumed(final Timer timer) { Map allConsumed = subscriptions.allConsumed(); log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); try { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 7616ac6912289..64bba14837aaa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -207,8 +207,7 @@ private void reapExpiredApplicationEvents(long currentTimeMs) { */ // Visible for testing static void runAtClose(final Collection> requestManagers, - final NetworkClientDelegate networkClientDelegate, - final Timer timer) { + final NetworkClientDelegate networkClientDelegate) { // These are the optional outgoing requests at the requestManagers.stream() .filter(Optional::isPresent) @@ -300,15 +299,21 @@ private void sendUnsentRequests(final Timer timer) { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); } while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests()); + + if (networkClientDelegate.hasAnyPendingRequests()) { + log.warn("Close timeout of {} ms expired before the consumer network thread was able " + + "to complete pending requests. Inflight request count: {}, Unsent request count: {}", + timer.timeoutMs(), networkClientDelegate.inflightRequestCount(), networkClientDelegate.unsentRequests().size()); + } } void cleanup() { log.trace("Closing the consumer network thread"); Timer timer = time.timer(closeTimeout); try { - runAtClose(requestManagers.entries(), networkClientDelegate, timer); + runAtClose(requestManagers.entries(), networkClientDelegate); } catch (Exception e) { - log.error("Unexpected error during shutdown. Proceed with closing.", e); + log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); applicationEventReaper.reap(applicationEventQueue); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 310e0c417f617..8fd5e16434b31 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -90,6 +90,10 @@ Queue unsentRequests() { return unsentRequests; } + public int inflightRequestCount() { + return client.inFlightRequestCount(); + } + /** * Check if the node is disconnected and unavailable for immediate reconnection (i.e. if it is in * reconnect backoff window following the disconnect). diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 5f59794797815..2d126b3eac6c2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -881,7 +881,7 @@ public void testAutoCommitSyncEnabled() { consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); subscriptions.seek(new TopicPartition("topic", 0), 100); - consumer.autoCommitSync(time.timer(100)); + consumer.commitSyncAllConsumed(time.timer(100)); verify(applicationEventHandler).add(any(SyncCommitEvent.class)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index d65d6d5f2fc43..2991a852be810 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -372,7 +372,7 @@ public void testFetcherCloseClosesFetchSessionsInBroker() { // NOTE: by design the FetchRequestManager doesn't perform network I/O internally. That means that calling // the close() method with a Timer will NOT send out the close session requests on close. The network // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we need to run that logic here. - ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, timer); + ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate); // the network is polled during the last state of clean up. networkClientDelegate.poll(time.timer(1)); // validate that closing the fetcher has sent a request with final epoch. 2 requests are sent, one for the