diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index cb2c8aa1048cc..6622cb286d9f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -53,7 +53,6 @@ import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; -import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; @@ -90,7 +89,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.event.Level; @@ -109,7 +107,6 @@ import java.util.OptionalLong; import java.util.Set; import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; @@ -1233,8 +1230,8 @@ private void close(Duration timeout, boolean swallowException) { clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); // Prepare shutting down the network thread - prepareShutdown(closeTimer, firstException); - closeTimer.update(); + swallow(log, Level.ERROR, "Failed to release assignment before closing consumer", + () -> releaseAssignmentAndLeaveGroup(closeTimer), firstException); swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", () -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException); if (applicationEventHandler != null) @@ -1266,25 +1263,34 @@ private void close(Duration timeout, boolean swallowException) { /** * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: * 1. autocommit offsets - * 2. revoke all partitions - * 3. if partition revocation completes successfully, send leave group + * 2. release assignment. This is done via a background unsubscribe event that will + * trigger the callbacks, clear the assignment on the subscription state and send the leave group request to the broker */ - void prepareShutdown(final Timer timer, final AtomicReference firstException) { + private void releaseAssignmentAndLeaveGroup(final Timer timer) { if (!groupMetadata.get().isPresent()) return; if (autoCommitEnabled) - autoCommitSync(timer); + commitSyncAllConsumed(timer); applicationEventHandler.add(new CommitOnCloseEvent()); - completeQuietly(() -> maybeRevokePartitions(), - "Failed to execute callback to release assignment", firstException); - completeQuietly(() -> applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer))), - "Failed to send leave group heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); + + log.info("Releasing assignment and leaving group before closing consumer"); + UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); + applicationEventHandler.add(unsubscribeEvent); + try { + processBackgroundEvents(unsubscribeEvent.future(), timer); + log.info("Completed releasing assignment and sending leave group to close consumer"); + } catch (TimeoutException e) { + log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " + + "complete it within {} ms. It will proceed to close.", timer.timeoutMs()); + } finally { + timer.update(); + } } // Visible for testing - void autoCommitSync(final Timer timer) { + void commitSyncAllConsumed(final Timer timer) { Map allConsumed = subscriptions.allConsumed(); log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); try { @@ -1296,37 +1302,6 @@ void autoCommitSync(final Timer timer) { timer.update(); } - // Visible for testing - void maybeRevokePartitions() { - if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) - return; - try { - SortedSet droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); - droppedPartitions.addAll(subscriptions.assignedPartitions()); - if (subscriptions.rebalanceListener().isPresent()) - subscriptions.rebalanceListener().get().onPartitionsRevoked(droppedPartitions); - } catch (Exception e) { - throw new KafkaException(e); - } finally { - subscriptions.assignFromSubscribed(Collections.emptySet()); - } - } - - // Visible for testing - void completeQuietly(final Utils.ThrowingRunnable function, - final String msg, - final AtomicReference firstException) { - try { - function.run(); - } catch (TimeoutException e) { - log.debug("Timeout expired before the {} operation could complete.", msg); - firstException.compareAndSet(null, e); - } catch (Exception e) { - log.error(msg, e); - firstException.compareAndSet(null, e); - } - } - @Override public void wakeup() { wakeupTrigger.wakeup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 7616ac6912289..64bba14837aaa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -207,8 +207,7 @@ private void reapExpiredApplicationEvents(long currentTimeMs) { */ // Visible for testing static void runAtClose(final Collection> requestManagers, - final NetworkClientDelegate networkClientDelegate, - final Timer timer) { + final NetworkClientDelegate networkClientDelegate) { // These are the optional outgoing requests at the requestManagers.stream() .filter(Optional::isPresent) @@ -300,15 +299,21 @@ private void sendUnsentRequests(final Timer timer) { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); } while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests()); + + if (networkClientDelegate.hasAnyPendingRequests()) { + log.warn("Close timeout of {} ms expired before the consumer network thread was able " + + "to complete pending requests. Inflight request count: {}, Unsent request count: {}", + timer.timeoutMs(), networkClientDelegate.inflightRequestCount(), networkClientDelegate.unsentRequests().size()); + } } void cleanup() { log.trace("Closing the consumer network thread"); Timer timer = time.timer(closeTimeout); try { - runAtClose(requestManagers.entries(), networkClientDelegate, timer); + runAtClose(requestManagers.entries(), networkClientDelegate); } catch (Exception e) { - log.error("Unexpected error during shutdown. Proceed with closing.", e); + log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { sendUnsentRequests(timer); applicationEventReaper.reap(applicationEventQueue); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index d069a0d1fb64b..83015acd89af0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -81,6 +81,10 @@ Queue unsentRequests() { return unsentRequests; } + public int inflightRequestCount() { + return client.inFlightRequestCount(); + } + /** * Check if the node is disconnected and unavailable for immediate reconnection (i.e. if it is in * reconnect backoff window following the disconnect). diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 2897117da8bab..ae5f7faeaefd4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -31,7 +31,7 @@ public enum Type { COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, - COMMIT_ON_CLOSE, LEAVE_ON_CLOSE + COMMIT_ON_CLOSE } private final Type type; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 7ee0c09d40df9..9d0062f546fb8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -119,10 +118,6 @@ public void process(ApplicationEvent event) { process((CommitOnCloseEvent) event); return; - case LEAVE_ON_CLOSE: - process((LeaveOnCloseEvent) event); - return; - default: log.warn("Application event type " + event.type() + " was not expected"); } @@ -268,20 +263,6 @@ private void process(@SuppressWarnings("unused") final CommitOnCloseEvent event) requestManagers.commitRequestManager.get().signalClose(); } - private void process(final LeaveOnCloseEvent event) { - if (!requestManagers.heartbeatRequestManager.isPresent()) { - event.future().complete(null); - return; - } - MembershipManager membershipManager = - Objects.requireNonNull(requestManagers.heartbeatRequestManager.get().membershipManager(), "Expecting " + - "membership manager to be non-null"); - log.debug("Leaving group before closing"); - CompletableFuture future = membershipManager.leaveGroup(); - // The future will be completed on heartbeat sent - future.whenComplete(complete(event.future())); - } - private BiConsumer complete(final CompletableFuture b) { return (value, exception) -> { if (exception != null) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java deleted file mode 100644 index 647265a1500c8..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals.events; - -public class LeaveOnCloseEvent extends CompletableApplicationEvent { - - public LeaveOnCloseEvent(final long deadlineMs) { - super(Type.LEAVE_ON_CLOSE, deadlineMs); - } -} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index c5d0a797e1c89..09efcbbad98f5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -42,7 +42,6 @@ import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; -import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; @@ -108,7 +107,6 @@ import java.util.stream.Stream; import static java.util.Arrays.asList; -import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED; @@ -131,7 +129,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -139,6 +136,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.clearInvocations; @@ -241,6 +239,7 @@ private AsyncKafkaConsumer newConsumer( @Test public void testSuccessfulStartupShutdown() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.close()); } @@ -253,6 +252,7 @@ public void testInvalidGroupId() { @Test public void testFailOnClosedConsumer() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(); final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::assignment); assertEquals("This consumer has already been closed.", res.getMessage()); @@ -945,6 +945,7 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() { @Test public void testEnsureShutdownExecutedCommitAsyncCallbacks() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); MockCommitCallback callback = new MockCommitCallback(); completeCommitAsyncApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); @@ -956,68 +957,45 @@ public void testEnsureShutdownExecutedCommitAsyncCallbacks() { @Test public void testVerifyApplicationEventOnShutdown() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); doReturn(null).when(applicationEventHandler).addAndGet(any()); consumer.close(); - verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class)); + verify(applicationEventHandler).add(any(UnsubscribeEvent.class)); verify(applicationEventHandler).add(any(CommitOnCloseEvent.class)); } @Test - public void testPartitionRevocationOnClose() { - MockRebalanceListener listener = new MockRebalanceListener(); - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); - consumer = newConsumer( + public void testUnsubscribeOnClose() { + SubscriptionState subscriptions = mock(SubscriptionState.class); + consumer = spy(newConsumer( mock(FetchBuffer.class), mock(ConsumerInterceptors.class), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", - "client-id"); - - consumer.subscribe(singleton("topic"), listener); - subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); + "client-id")); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(Duration.ZERO); - assertTrue(subscriptions.assignedPartitions().isEmpty()); - assertEquals(1, listener.revokedCount); + verifyUnsubscribeEvent(subscriptions); } @Test public void testFailedPartitionRevocationOnClose() { // If rebalance listener failed to execute during close, we still send the leave group, // and proceed with closing the consumer. - ConsumerRebalanceListener listener = mock(ConsumerRebalanceListener.class); - SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); - consumer = newConsumer( + SubscriptionState subscriptions = mock(SubscriptionState.class); + consumer = spy(newConsumer( mock(FetchBuffer.class), new ConsumerInterceptors<>(Collections.emptyList()), mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", - "client-id"); - subscriptions.subscribe(singleton("topic"), Optional.of(listener)); - TopicPartition tp = new TopicPartition("topic", 0); - subscriptions.assignFromSubscribed(singleton(tp)); - doThrow(new KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp))); + "client-id")); + doThrow(new KafkaException()).when(consumer).processBackgroundEvents(any(), any()); assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO)); - verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class)); - verify(listener).onPartitionsRevoked(eq(singleton(tp))); - assertEquals(emptySet(), subscriptions.assignedPartitions()); - } - - @Test - public void testCompleteQuietly() { - AtomicReference exception = new AtomicReference<>(); - CompletableFuture future = CompletableFuture.completedFuture(null); - consumer = newConsumer(); - assertDoesNotThrow(() -> consumer.completeQuietly(() -> { - future.get(0, TimeUnit.MILLISECONDS); - }, "test", exception)); - assertNull(exception.get()); - - assertDoesNotThrow(() -> consumer.completeQuietly(() -> { - throw new KafkaException("Test exception"); - }, "test", exception)); - assertInstanceOf(KafkaException.class, exception.get()); + verifyUnsubscribeEvent(subscriptions); + // Close operation should carry on even if the unsubscribe fails + verify(applicationEventHandler).close(any(Duration.class)); } @Test @@ -1034,7 +1012,7 @@ public void testAutoCommitSyncEnabled() { consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class)); subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); subscriptions.seek(new TopicPartition("topic", 0), 100); - consumer.autoCommitSync(time.timer(100)); + consumer.commitSyncAllConsumed(time.timer(100)); verify(applicationEventHandler).add(any(SyncCommitEvent.class)); } @@ -1318,7 +1296,7 @@ public void testNoWakeupInCloseCommit() { } return null; }).when(applicationEventHandler).add(any()); - + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(Duration.ZERO); // A commit was triggered and not completed exceptionally by the wakeup @@ -1356,6 +1334,7 @@ public void testCloseAwaitPendingAsyncCommitComplete() { completeCommitAsyncApplicationEventSuccessfully(); consumer.commitAsync(cb); + completeUnsubscribeApplicationEventSuccessfully(); assertDoesNotThrow(() -> consumer.close(Duration.ofMillis(10))); assertEquals(1, cb.invoked); } @@ -1370,6 +1349,7 @@ public void testInterceptorAutoCommitOnClose() { consumer = newConsumer(props); assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); completeCommitSyncApplicationEventSuccessfully(); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(Duration.ZERO); @@ -2074,6 +2054,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { @Test void testReaperInvokedInClose() { consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); consumer.close(); verify(backgroundEventReaper).reap(backgroundEventQueue); } @@ -2095,6 +2076,18 @@ void testReaperInvokedInPoll() { verify(backgroundEventReaper).reap(time.milliseconds()); } + private void verifyUnsubscribeEvent(SubscriptionState subscriptions) { + // Check that an unsubscribe event was generated, and that the consumer waited for it to + // complete processing background events. + verify(applicationEventHandler).add(any(UnsubscribeEvent.class)); + verify(consumer).processBackgroundEvents(any(), any()); + + // The consumer should not clear the assignment in the app thread. The unsubscribe + // event is the one responsible for updating the assignment in the background when it + // completes. + verify(subscriptions, never()).assignFromSubscribed(any()); + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index 71a267b524612..204f87e1c1ff5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -370,7 +370,7 @@ public void testFetcherCloseClosesFetchSessionsInBroker() { // NOTE: by design the FetchRequestManager doesn't perform network I/O internally. That means that calling // the close() method with a Timer will NOT send out the close session requests on close. The network // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we need to run that logic here. - ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, timer); + ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate); // the network is polled during the last state of clean up. networkClientDelegate.poll(time.timer(1)); // validate that closing the fetcher has sent a request with final epoch. 2 requests are sent, one for the diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 451743ae2ad83..bb021fa8b9385 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -36,14 +36,10 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class ApplicationEventProcessorTest { private final Time time = new MockTime(1); @@ -90,16 +86,6 @@ public void testPrepClosingCommitEvents() { verify(commitRequestManager).signalClose(); } - @Test - public void testPrepClosingLeaveGroupEvent() { - LeaveOnCloseEvent event = new LeaveOnCloseEvent(calculateDeadlineMs(time, 100)); - when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); - when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null)); - processor.process(event); - verify(membershipManager).leaveGroup(); - assertTrue(event.future().isDone()); - } - private List mockCommitResults() { return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); }