From e56a0de815531a1d59b12550b078d5e3a31361f5 Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Thu, 13 Jun 2024 08:31:16 +0200 Subject: [PATCH 1/3] MINOR: Improving log for outstanding requests on close and cleanup (#16304) Reviewers: Andrew Schofield , Chia-Ping Tsai --- .../consumer/internals/AsyncKafkaConsumer.java | 8 ++++---- .../consumer/internals/ConsumerNetworkThread.java | 13 +++++++++---- .../consumer/internals/NetworkClientDelegate.java | 4 ++++ .../consumer/internals/AsyncKafkaConsumerTest.java | 2 +- .../consumer/internals/FetchRequestManagerTest.java | 2 +- 5 files changed, 19 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index cb2c8aa1048cc..98cc812735e80 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1233,7 +1233,7 @@ private void close(Duration timeout, boolean swallowException) { clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); // Prepare shutting down the network thread - prepareShutdown(closeTimer, firstException); + releaseAssignmentAndLeaveGroup(closeTimer, firstException); closeTimer.update(); swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", () -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException); @@ -1269,12 +1269,12 @@ private void close(Duration timeout, boolean swallowException) { * 2. revoke all partitions * 3. if partition revocation completes successfully, send leave group */ - void prepareShutdown(final Timer timer, final AtomicReference firstException) { + void releaseAssignmentAndLeaveGroup(final Timer timer, final AtomicReference firstException) { if (!groupMetadata.get().isPresent()) return; if (autoCommitEnabled) - autoCommitSync(timer); + commitSyncAllConsumed(timer); applicationEventHandler.add(new CommitOnCloseEvent()); completeQuietly(() -> maybeRevokePartitions(), @@ -1284,7 +1284,7 @@ void prepareShutdown(final Timer timer, final AtomicReference firstEx } // Visible for testing - void autoCommitSync(final Timer timer) { + void commitSyncAllConsumed(final Timer timer) { Map allConsumed = subscriptions.allConsumed(); log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); try { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 7616ac6912289..64bba14837aaa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -207,8 +207,7 @@ private void reapExpiredApplicationEvents(long currentTimeMs) { */ // Visible for testing static void runAtClose(final Collection> requestManagers, - final NetworkClientDelegate networkClientDelegate, - final Timer timer) { + final NetworkClientDelegate networkClientDelegate) { // These are the optional outgoing requests at the requestManagers.stream() .filter(Optional::isPresent) @@ -300,15 +299,21 @@ private void sendUnsentRequests(final Timer timer) { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); } while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests()); + + if (networkClientDelegate.hasAnyPendingRequests()) { + log.warn("Close timeout of {} ms expired before the consumer network thread was able " + + "to complete pending requests. Inflight request count: {}, Unsent request count: {}", + timer.timeoutMs(), networkClientDelegate.inflightRequestCount(), networkClientDelegate.unsentRequests().size()); + } } void cleanup() { log.trace("Closing the consumer network thread"); Timer timer = time.timer(closeTimeout); try { - runAtClose(requestManagers.entries(), networkClientDelegate, timer); + runAtClose(requestManagers.entries(), networkClientDelegate); } catch (Exception e) { - log.error("Unexpected error during shutdown. Proceed with closing.", e); + log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); applicationEventReaper.reap(applicationEventQueue); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index d069a0d1fb64b..83015acd89af0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -81,6 +81,10 @@ Queue unsentRequests() { return unsentRequests; } + public int inflightRequestCount() { + return client.inFlightRequestCount(); + } + /** * Check if the node is disconnected and unavailable for immediate reconnection (i.e. if it is in * reconnect backoff window following the disconnect). diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index c5d0a797e1c89..6057ce63721a0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -1034,7 +1034,7 @@ public void testAutoCommitSyncEnabled() { consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); subscriptions.seek(new TopicPartition("topic", 0), 100); - consumer.autoCommitSync(time.timer(100)); + consumer.commitSyncAllConsumed(time.timer(100)); verify(applicationEventHandler).add(any(SyncCommitEvent.class)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index 71a267b524612..204f87e1c1ff5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -370,7 +370,7 @@ public void testFetcherCloseClosesFetchSessionsInBroker() { // NOTE: by design the FetchRequestManager doesn't perform network I/O internally. That means that calling // the close() method with a Timer will NOT send out the close session requests on close. The network // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we need to run that logic here. - ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, timer); + ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate); // the network is polled during the last state of clean up. networkClientDelegate.poll(time.timer(1)); // validate that closing the fetcher has sent a request with final epoch. 2 requests are sent, one for the From b01c578dca41e4810f29169cf24e04dd828e7a3b Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Mon, 17 Jun 2024 21:27:33 +0200 Subject: [PATCH 2/3] KAFKA-16954: fix consumer close to release assignment in background (#16343) This PR fixes consumer close to avoid updating the subscription state object in the app thread. Now the close simply triggers an UnsubscribeEvent that is handled in the background to trigger callbacks, clear assignment, and send leave heartbeat. Note that after triggering the event, the unsubscribe will continuously process background events until the event completes, to ensure that it allows for callbacks to run in the app thread. The logic around what happens if the unsubscribe fails remain unchanged: close will log, keep the first exception and carry on. It also removes the redundant LeaveOnClose event (it used to do the the exact same thing as the UnsubscribeEvent, both calling membershipMgr.leaveGroup). Reviewers: Lucas Brutschy --- .../internals/AsyncKafkaConsumer.java | 61 ++++---------- .../internals/events/ApplicationEvent.java | 2 +- .../events/ApplicationEventProcessor.java | 19 ----- .../internals/events/LeaveOnCloseEvent.java | 24 ------ .../internals/AsyncKafkaConsumerTest.java | 83 +++++++++---------- .../events/ApplicationEventProcessorTest.java | 15 ---- 6 files changed, 57 insertions(+), 147 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 98cc812735e80..6622cb286d9f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -53,7 +53,6 @@ import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; -import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; @@ -90,7 +89,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.event.Level; @@ -109,7 +107,6 @@ import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -1233,8 +1230,8 @@ private void close(Duration timeout, boolean swallowException) { clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); // Prepare shutting down the network thread - releaseAssignmentAndLeaveGroup(closeTimer, firstException); - closeTimer.update(); + swallow(log, Level.ERROR, "Failed to release assignment before closing consumer", + () -> releaseAssignmentAndLeaveGroup(closeTimer), firstException); swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", () -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException); if (applicationEventHandler != null) @@ -1266,10 +1263,10 @@ private void close(Duration timeout, boolean swallowException) { /** * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: * 1. autocommit offsets - * 2. revoke all partitions - * 3. if partition revocation completes successfully, send leave group + * 2. release assignment. This is done via a background unsubscribe event that will + * trigger the callbacks, clear the assignment on the subscription state and send the leave group request to the broker */ - void releaseAssignmentAndLeaveGroup(final Timer timer, final AtomicReference firstException) { + private void releaseAssignmentAndLeaveGroup(final Timer timer) { if (!groupMetadata.get().isPresent()) return; @@ -1277,10 +1274,19 @@ void releaseAssignmentAndLeaveGroup(final Timer timer, final AtomicReference maybeRevokePartitions(), - "Failed to execute callback to release assignment", firstException); - completeQuietly(() -> applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer))), - "Failed to send leave group heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); + + log.info("Releasing assignment and leaving group before closing consumer"); + UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); + applicationEventHandler.add(unsubscribeEvent); + try { + processBackgroundEvents(unsubscribeEvent.future(), timer); + log.info("Completed releasing assignment and sending leave group to close consumer"); + } catch (TimeoutException e) { + log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " + + "complete it within {} ms. It will proceed to close.", timer.timeoutMs()); + } finally { + timer.update(); + } } // Visible for testing @@ -1296,37 +1302,6 @@ void commitSyncAllConsumed(final Timer timer) { timer.update(); } - // Visible for testing - void maybeRevokePartitions() { - if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) - return; - try { - SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); - droppedPartitions.addAll(subscriptions.assignedPartitions()); - if (subscriptions.rebalanceListener().isPresent()) - subscriptions.rebalanceListener().get().onPartitionsRevoked(droppedPartitions); - } catch (Exception e) { - throw new KafkaException(e); - } finally { - subscriptions.assignFromSubscribed(Collections.emptySet()); - } - } - - // Visible for testing - void completeQuietly(final Utils.ThrowingRunnable function, - final String msg, - final AtomicReference firstException) { - try { - function.run(); - } catch (TimeoutException e) { - log.debug("Timeout expired before the {} operation could complete.", msg); - firstException.compareAndSet(null, e); - } catch (Exception e) { - log.error(msg, e); - firstException.compareAndSet(null, e); - } - } - @Override public void wakeup() { wakeupTrigger.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 2897117da8bab..ae5f7faeaefd4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -31,7 +31,7 @@ public enum Type { COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, - COMMIT_ON_CLOSE, LEAVE_ON_CLOSE + COMMIT_ON_CLOSE } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 7ee0c09d40df9..9d0062f546fb8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -119,10 +118,6 @@ public void process(ApplicationEvent event) { process((CommitOnCloseEvent) event); return; - case LEAVE_ON_CLOSE: - process((LeaveOnCloseEvent) event); - return; - default: log.warn("Application event type " + event.type() + " was not expected"); } @@ -268,20 +263,6 @@ private void process(@SuppressWarnings("unused") final CommitOnCloseEvent event) requestManagers.commitRequestManager.get().signalClose(); } - private void process(final LeaveOnCloseEvent event) { - if (!requestManagers.heartbeatRequestManager.isPresent()) { - event.future().complete(null); - return; - } - MembershipManager membershipManager = - Objects.requireNonNull(requestManagers.heartbeatRequestManager.get().membershipManager(), "Expecting " + - "membership manager to be non-null"); - log.debug("Leaving group before closing"); - CompletableFuture future = membershipManager.leaveGroup(); - // The future will be completed on heartbeat sent - future.whenComplete(complete(event.future())); - } - private BiConsumer complete(final CompletableFuture b) { return (value, exception) -> { if (exception != null) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java deleted file mode 100644 index 647265a1500c8..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals.events; - -public class LeaveOnCloseEvent extends CompletableApplicationEvent { - - public LeaveOnCloseEvent(final long deadlineMs) { - super(Type.LEAVE_ON_CLOSE, deadlineMs); - } -} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 6057ce63721a0..9b15938c460ab 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -42,7 +42,6 @@ import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; -import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; @@ -108,7 +107,6 @@ import java.util.stream.Stream; import static java.util.Arrays.asList; -import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED; @@ -131,7 +129,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -139,6 +136,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.clearInvocations; @@ -241,6 +239,7 @@ private AsyncKafkaConsumer newConsumer( @Test public void testSuccessfulStartupShutdown() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.close()); } @@ -253,6 +252,7 @@ public void testInvalidGroupId() { @Test public void testFailOnClosedConsumer() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(); final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::assignment); assertEquals("This consumer has already been closed.", res.getMessage()); @@ -945,6 +945,7 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() { @Test public void testEnsureShutdownExecutedCommitAsyncCallbacks() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); MockCommitCallback callback = new MockCommitCallback(); completeCommitAsyncApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); @@ -956,68 +957,45 @@ public void testEnsureShutdownExecutedCommitAsyncCallbacks() { @Test public void testVerifyApplicationEventOnShutdown() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); doReturn(null).when(applicationEventHandler).addAndGet(any()); consumer.close(); - verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class)); + verify(applicationEventHandler).add(any(UnsubscribeEvent.class)); verify(applicationEventHandler).add(any(CommitOnCloseEvent.class)); } @Test - public void testPartitionRevocationOnClose() { - MockRebalanceListener listener = new MockRebalanceListener(); - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); - consumer = newConsumer( + public void testUnsubscribeOnClose() { + SubscriptionState subscriptions = mock(SubscriptionState.class); + consumer = spy(newConsumer( mock(FetchBuffer.class), mock(ConsumerInterceptors.class), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", - "client-id"); - - consumer.subscribe(singleton("topic"), listener); - subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + "client-id")); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(Duration.ZERO); - assertTrue(subscriptions.assignedPartitions().isEmpty()); - assertEquals(1, listener.revokedCount); + verifyUnsubscribeEvent(subscriptions); } @Test public void testFailedPartitionRevocationOnClose() { // If rebalance listener failed to execute during close, we still send the leave group, // and proceed with closing the consumer. - ConsumerRebalanceListener listener = mock(ConsumerRebalanceListener.class); - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); - consumer = newConsumer( + SubscriptionState subscriptions = mock(SubscriptionState.class); + consumer = spy(newConsumer( mock(FetchBuffer.class), new ConsumerInterceptors<>(Collections.emptyList()), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", - "client-id"); - subscriptions.subscribe(singleton("topic"), Optional.of(listener)); - TopicPartition tp = new TopicPartition("topic", 0); - subscriptions.assignFromSubscribed(singleton(tp)); - doThrow(new KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp))); + "client-id")); + doThrow(new KafkaException()).when(consumer).processBackgroundEvents(any(), any()); assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO)); - verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class)); - verify(listener).onPartitionsRevoked(eq(singleton(tp))); - assertEquals(emptySet(), subscriptions.assignedPartitions()); - } - - @Test - public void testCompleteQuietly() { - AtomicReference exception = new AtomicReference<>(); - CompletableFuture future = CompletableFuture.completedFuture(null); - consumer = newConsumer(); - assertDoesNotThrow(() -> consumer.completeQuietly(() -> { - future.get(0, TimeUnit.MILLISECONDS); - }, "test", exception)); - assertNull(exception.get()); - - assertDoesNotThrow(() -> consumer.completeQuietly(() -> { - throw new KafkaException("Test exception"); - }, "test", exception)); - assertInstanceOf(KafkaException.class, exception.get()); + verifyUnsubscribeEvent(subscriptions); + // Close operation should carry on even if the unsubscribe fails + verify(applicationEventHandler).close(any(Duration.class)); } @Test @@ -1318,7 +1296,7 @@ public void testNoWakeupInCloseCommit() { } return null; }).when(applicationEventHandler).add(any()); - + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(Duration.ZERO); // A commit was triggered and not completed exceptionally by the wakeup @@ -1356,6 +1334,7 @@ public void testCloseAwaitPendingAsyncCommitComplete() { completeCommitAsyncApplicationEventSuccessfully(); consumer.commitAsync(cb); + completeUnsubscribeApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.close(Duration.ofMillis(10))); assertEquals(1, cb.invoked); } @@ -1370,6 +1349,7 @@ public void testInterceptorAutoCommitOnClose() { consumer = newConsumer(props); assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); completeCommitSyncApplicationEventSuccessfully(); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(Duration.ZERO); @@ -1958,9 +1938,9 @@ public void testLongPollWaitIsLimited() { // Mock the subscription being assigned as the first fetch is collected consumer.subscriptions().assignFromSubscribed(Collections.singleton(tp)); return Fetch.empty(); - }).doAnswer(invocation -> { - return Fetch.forPartition(tp, records, true); - }).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + }).doAnswer(invocation -> + Fetch.forPartition(tp, records, true) + ).when(fetchCollector).collectFetch(any(FetchBuffer.class)); // And then poll for up to 10000ms, which should return 2 records without timing out ConsumerRecords returnedRecords = consumer.poll(Duration.ofMillis(10000)); @@ -2074,6 +2054,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { @Test void testReaperInvokedInClose() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(); verify(backgroundEventReaper).reap(backgroundEventQueue); } @@ -2095,6 +2076,18 @@ void testReaperInvokedInPoll() { verify(backgroundEventReaper).reap(time.milliseconds()); } + private void verifyUnsubscribeEvent(SubscriptionState subscriptions) { + // Check that an unsubscribe event was generated, and that the consumer waited for it to + // complete processing background events. + verify(applicationEventHandler).add(any(UnsubscribeEvent.class)); + verify(consumer).processBackgroundEvents(any(), any()); + + // The consumer should not clear the assignment in the app thread. The unsubscribe + // event is the one responsible for updating the assignment in the background when it + // completes. + verify(subscriptions, never()).assignFromSubscribed(any()); + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 451743ae2ad83..41569b73b076c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -27,8 +27,6 @@ import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -38,12 +36,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class ApplicationEventProcessorTest { private final Time time = new MockTime(1); @@ -90,16 +85,6 @@ public void testPrepClosingCommitEvents() { verify(commitRequestManager).signalClose(); } - @Test - public void testPrepClosingLeaveGroupEvent() { - LeaveOnCloseEvent event = new LeaveOnCloseEvent(calculateDeadlineMs(time, 100)); - when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); - when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null)); - processor.process(event); - verify(membershipManager).leaveGroup(); - assertTrue(event.future().isDone()); - } - private List mockCommitResults() { return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); } From ceb4aceb9977a4b8a5c844c3e4c66988c4004a00 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Mon, 17 Jun 2024 17:04:04 -0400 Subject: [PATCH 3/3] Remove unrelated cleanup --- .../clients/consumer/internals/AsyncKafkaConsumerTest.java | 6 +++--- .../internals/events/ApplicationEventProcessorTest.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 9b15938c460ab..09efcbbad98f5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -1938,9 +1938,9 @@ public void testLongPollWaitIsLimited() { // Mock the subscription being assigned as the first fetch is collected consumer.subscriptions().assignFromSubscribed(Collections.singleton(tp)); return Fetch.empty(); - }).doAnswer(invocation -> - Fetch.forPartition(tp, records, true) - ).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + }).doAnswer(invocation -> { + return Fetch.forPartition(tp, records, true); + }).when(fetchCollector).collectFetch(any(FetchBuffer.class)); // And then poll for up to 10000ms, which should return 2 records without timing out ConsumerRecords returnedRecords = consumer.poll(Duration.ofMillis(10000)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 41569b73b076c..bb021fa8b9385 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -27,6 +27,8 @@ import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -34,7 +36,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock;