From cb06f95a8252fa06689eb98c416312732d9d93c7 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Fri, 14 Jun 2024 09:42:59 -0400 Subject: [PATCH 1/3] Remove redundant LeaveOnCLose event --- .../internals/events/ApplicationEvent.java | 2 +- .../events/ApplicationEventProcessor.java | 19 --------------- .../internals/events/LeaveOnCloseEvent.java | 24 ------------------- .../events/ApplicationEventProcessorTest.java | 17 ------------- 4 files changed, 1 insertion(+), 61 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 2897117da8bab..ae5f7faeaefd4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -31,7 +31,7 @@ public enum Type { COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, - COMMIT_ON_CLOSE, LEAVE_ON_CLOSE + COMMIT_ON_CLOSE } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 7ee0c09d40df9..9d0062f546fb8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -119,10 +118,6 @@ public void process(ApplicationEvent event) { process((CommitOnCloseEvent) event); return; - case LEAVE_ON_CLOSE: - process((LeaveOnCloseEvent) event); - return; - default: log.warn("Application event type " + event.type() + " was not expected"); } @@ -268,20 +263,6 @@ private void process(@SuppressWarnings("unused") final CommitOnCloseEvent event) requestManagers.commitRequestManager.get().signalClose(); } - private void process(final LeaveOnCloseEvent event) { - if (!requestManagers.heartbeatRequestManager.isPresent()) { - event.future().complete(null); - return; - } - MembershipManager membershipManager = - Objects.requireNonNull(requestManagers.heartbeatRequestManager.get().membershipManager(), "Expecting " + - "membership manager to be non-null"); - log.debug("Leaving group before closing"); - CompletableFuture future = membershipManager.leaveGroup(); - // The future will be completed on heartbeat sent - future.whenComplete(complete(event.future())); - } - private BiConsumer complete(final CompletableFuture b) { return (value, exception) -> { if (exception != null) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java deleted file mode 100644 index 647265a1500c8..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals.events; - -public class LeaveOnCloseEvent extends CompletableApplicationEvent { - - public LeaveOnCloseEvent(final long deadlineMs) { - super(Type.LEAVE_ON_CLOSE, deadlineMs); - } -} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 6a4b9faf479d6..3202266a33a81 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -27,25 +27,18 @@ import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager; import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class ApplicationEventProcessorTest { - private final Time time = new MockTime(1); private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private ApplicationEventProcessor processor; private CommitRequestManager commitRequestManager; @@ -87,16 +80,6 @@ public void testPrepClosingCommitEvents() { verify(commitRequestManager).signalClose(); } - @Test - public void testPrepClosingLeaveGroupEvent() { - LeaveOnCloseEvent event = new LeaveOnCloseEvent(calculateDeadlineMs(time, 100)); - when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); - when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null)); - processor.process(event); - verify(membershipManager).leaveGroup(); - assertTrue(event.future().isDone()); - } - private List mockCommitResults() { return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); } From 6e489e79e44034ebf10c47d9175b89f148823b68 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Fri, 14 Jun 2024 11:07:15 -0400 Subject: [PATCH 2/3] Close leave operations in background --- .../internals/AsyncKafkaConsumer.java | 58 ++++---------- .../internals/AsyncKafkaConsumerTest.java | 75 +++++++++---------- 2 files changed, 50 insertions(+), 83 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 384bce0f6702d..22c2a281c840d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -53,7 +53,6 @@ import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; -import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; @@ -90,7 +89,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.event.Level; @@ -109,7 +107,6 @@ import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -1234,8 +1231,8 @@ private void close(Duration timeout, boolean swallowException) { clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); // Prepare shutting down the network thread - releaseAssignmentAndLeaveGroup(closeTimer, firstException); - closeTimer.update(); + swallow(log, Level.ERROR, "Failed to release assignment before closing consumer", + () -> releaseAssignmentAndLeaveGroup(closeTimer), firstException); swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", () -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException); if (applicationEventHandler != null) @@ -1267,10 +1264,10 @@ private void close(Duration timeout, boolean swallowException) { /** * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: * 1. autocommit offsets - * 2. revoke all partitions - * 3. if partition revocation completes successfully, send leave group + * 2. release assignment. This is done via a background unsubscribe event that will + * trigger the callbacks, clear the assignment on the subscription state and send the leave group request to the broker */ - void releaseAssignmentAndLeaveGroup(final Timer timer, final AtomicReference firstException) { + private void releaseAssignmentAndLeaveGroup(final Timer timer) { if (!groupMetadata.get().isPresent()) return; @@ -1278,10 +1275,16 @@ void releaseAssignmentAndLeaveGroup(final Timer timer, final AtomicReference maybeRevokePartitions(), - "Failed to execute callback to release assignment", firstException); - completeQuietly(() -> applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer))), - "Failed to send leave group heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); + + log.info("Releasing assignment and leaving group before closing consumer"); + UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); + applicationEventHandler.add(unsubscribeEvent); + try { + processBackgroundEvents(unsubscribeEvent.future(), timer); + log.info("Completed releasing assignment and sending leave group to close consumer"); + } finally { + timer.update(); + } } // Visible for testing @@ -1297,37 +1300,6 @@ void commitSyncAllConsumed(final Timer timer) { timer.update(); } - // Visible for testing - void maybeRevokePartitions() { - if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) - return; - try { - SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); - droppedPartitions.addAll(subscriptions.assignedPartitions()); - if (subscriptions.rebalanceListener().isPresent()) - subscriptions.rebalanceListener().get().onPartitionsRevoked(droppedPartitions); - } catch (Exception e) { - throw new KafkaException(e); - } finally { - subscriptions.assignFromSubscribed(Collections.emptySet()); - } - } - - // Visible for testing - void completeQuietly(final Utils.ThrowingRunnable function, - final String msg, - final AtomicReference firstException) { - try { - function.run(); - } catch (TimeoutException e) { - log.debug("Timeout expired before the {} operation could complete.", msg); - firstException.compareAndSet(null, e); - } catch (Exception e) { - log.error(msg, e); - firstException.compareAndSet(null, e); - } - } - @Override public void wakeup() { wakeupTrigger.wakeup(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 6a4cae657065e..d094f312fda1a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -42,7 +42,6 @@ import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; -import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; @@ -108,7 +107,6 @@ import java.util.stream.Stream; import static java.util.Arrays.asList; -import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED; @@ -131,7 +129,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -139,6 +136,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.clearInvocations; @@ -240,6 +238,7 @@ private AsyncKafkaConsumer newConsumer( @Test public void testSuccessfulStartupShutdown() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.close()); } @@ -252,6 +251,7 @@ public void testInvalidGroupId() { @Test public void testFailOnClosedConsumer() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(); final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::assignment); assertEquals("This consumer has already been closed.", res.getMessage()); @@ -799,6 +799,7 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() { @Test public void testEnsureShutdownExecutedCommitAsyncCallbacks() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); MockCommitCallback callback = new MockCommitCallback(); completeCommitAsyncApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); @@ -810,66 +811,45 @@ public void testEnsureShutdownExecutedCommitAsyncCallbacks() { @Test public void testVerifyApplicationEventOnShutdown() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); doReturn(null).when(applicationEventHandler).addAndGet(any()); consumer.close(); - verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class)); + verify(applicationEventHandler).add(any(UnsubscribeEvent.class)); verify(applicationEventHandler).add(any(CommitOnCloseEvent.class)); } @Test - public void testPartitionRevocationOnClose() { - MockRebalanceListener listener = new MockRebalanceListener(); - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); - consumer = newConsumer( + public void testUnsubscribeOnClose() { + SubscriptionState subscriptions = mock(SubscriptionState.class); + consumer = spy(newConsumer( mock(FetchBuffer.class), mock(ConsumerInterceptors.class), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", - "client-id"); - - consumer.subscribe(singleton("topic"), listener); - subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + "client-id")); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(Duration.ZERO); - assertTrue(subscriptions.assignedPartitions().isEmpty()); - assertEquals(1, listener.revokedCount); + verifyUnsubscribeEvent(subscriptions); } @Test public void testFailedPartitionRevocationOnClose() { // If rebalance listener failed to execute during close, we still send the leave group, // and proceed with closing the consumer. - ConsumerRebalanceListener listener = mock(ConsumerRebalanceListener.class); - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); - consumer = newConsumer( + SubscriptionState subscriptions = mock(SubscriptionState.class); + consumer = spy(newConsumer( mock(FetchBuffer.class), new ConsumerInterceptors<>(Collections.emptyList()), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", - "client-id"); - subscriptions.subscribe(singleton("topic"), Optional.of(listener)); - TopicPartition tp = new TopicPartition("topic", 0); - subscriptions.assignFromSubscribed(singleton(tp)); - doThrow(new KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp))); + "client-id")); + doThrow(new KafkaException()).when(consumer).processBackgroundEvents(any(), any()); assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO)); - verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class)); - verify(listener).onPartitionsRevoked(eq(singleton(tp))); - assertEquals(emptySet(), subscriptions.assignedPartitions()); - } - - @Test - public void testCompleteQuietly() { - AtomicReference exception = new AtomicReference<>(); - CompletableFuture future = CompletableFuture.completedFuture(null); - consumer = newConsumer(); - assertDoesNotThrow(() -> consumer.completeQuietly(() -> future.get(0, TimeUnit.MILLISECONDS), "test", exception)); - assertNull(exception.get()); - - assertDoesNotThrow(() -> consumer.completeQuietly(() -> { - throw new KafkaException("Test exception"); - }, "test", exception)); - assertInstanceOf(KafkaException.class, exception.get()); + verifyUnsubscribeEvent(subscriptions); + // Close operation should carry on even if the unsubscribe fails + verify(applicationEventHandler).close(any(Duration.class)); } @Test @@ -1170,7 +1150,7 @@ public void testNoWakeupInCloseCommit() { } return null; }).when(applicationEventHandler).add(any()); - + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(Duration.ZERO); // A commit was triggered and not completed exceptionally by the wakeup @@ -1208,6 +1188,7 @@ public void testCloseAwaitPendingAsyncCommitComplete() { completeCommitAsyncApplicationEventSuccessfully(); consumer.commitAsync(cb); + completeUnsubscribeApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.close(Duration.ofMillis(10))); assertEquals(1, cb.invoked); } @@ -1222,6 +1203,7 @@ public void testInterceptorAutoCommitOnClose() { consumer = newConsumer(props); assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); completeCommitSyncApplicationEventSuccessfully(); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(Duration.ZERO); @@ -1926,6 +1908,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { @Test void testReaperInvokedInClose() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(); verify(backgroundEventReaper).reap(backgroundEventQueue); } @@ -1947,6 +1930,18 @@ void testReaperInvokedInPoll() { verify(backgroundEventReaper).reap(time.milliseconds()); } + private void verifyUnsubscribeEvent(SubscriptionState subscriptions) { + // Check that an unsubscribe event was generated, and that the consumer waited for it to + // complete processing background events. + verify(applicationEventHandler).add(any(UnsubscribeEvent.class)); + verify(consumer).processBackgroundEvents(any(), any()); + + // The consumer should not clear the assignment in the app thread. The unsubscribe + // event is the one responsible for updating the assignment in the background when it + // completes. + verify(subscriptions, never()).assignFromSubscribed(any()); + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); From 67fecfacb5b60160f6c2f40deb0e189bdf290dca Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Mon, 17 Jun 2024 09:43:46 -0400 Subject: [PATCH 3/3] Log warn if timeout releasing assignment on close --- .../kafka/clients/consumer/internals/AsyncKafkaConsumer.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 22c2a281c840d..4acf66e5293e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1282,6 +1282,9 @@ private void releaseAssignmentAndLeaveGroup(final Timer timer) { try { processBackgroundEvents(unsubscribeEvent.future(), timer); log.info("Completed releasing assignment and sending leave group to close consumer"); + } catch (TimeoutException e) { + log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " + + "complete it within {} ms. It will proceed to close.", timer.timeoutMs()); } finally { timer.update(); }