Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
Expand Down Expand Up @@ -90,7 +89,6 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;

Expand All @@ -109,7 +107,6 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -1234,8 +1231,8 @@ private void close(Duration timeout, boolean swallowException) {
clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis()));
closeTimer.update();
// Prepare shutting down the network thread
releaseAssignmentAndLeaveGroup(closeTimer, firstException);
closeTimer.update();
swallow(log, Level.ERROR, "Failed to release assignment before closing consumer",
() -> releaseAssignmentAndLeaveGroup(closeTimer), firstException);
swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.",
() -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException);
if (applicationEventHandler != null)
Expand Down Expand Up @@ -1267,21 +1264,30 @@ private void close(Duration timeout, boolean swallowException) {
/**
* Prior to closing the network thread, we need to make sure the following operations happen in the right sequence:
* 1. autocommit offsets
* 2. revoke all partitions
* 3. if partition revocation completes successfully, send leave group
* 2. release assignment. This is done via a background unsubscribe event that will
* trigger the callbacks, clear the assignment on the subscription state and send the leave group request to the broker
*/
void releaseAssignmentAndLeaveGroup(final Timer timer, final AtomicReference<Throwable> firstException) {
private void releaseAssignmentAndLeaveGroup(final Timer timer) {
if (!groupMetadata.get().isPresent())
return;

if (autoCommitEnabled)
commitSyncAllConsumed(timer);

applicationEventHandler.add(new CommitOnCloseEvent());
completeQuietly(() -> maybeRevokePartitions(),
"Failed to execute callback to release assignment", firstException);
completeQuietly(() -> applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer))),
"Failed to send leave group heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException);

log.info("Releasing assignment and leaving group before closing consumer");
UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer));
applicationEventHandler.add(unsubscribeEvent);
try {
processBackgroundEvents(unsubscribeEvent.future(), timer);
log.info("Completed releasing assignment and sending leave group to close consumer");
} catch (TimeoutException e) {
log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " +
"complete it within {} ms. It will proceed to close.", timer.timeoutMs());
} finally {
timer.update();
}
}

// Visible for testing
Expand All @@ -1297,37 +1303,6 @@ void commitSyncAllConsumed(final Timer timer) {
timer.update();
}

// Visible for testing
void maybeRevokePartitions() {
if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty())
return;
try {
SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(subscriptions.assignedPartitions());
if (subscriptions.rebalanceListener().isPresent())
subscriptions.rebalanceListener().get().onPartitionsRevoked(droppedPartitions);
} catch (Exception e) {
throw new KafkaException(e);
} finally {
subscriptions.assignFromSubscribed(Collections.emptySet());
}
}

// Visible for testing
void completeQuietly(final Utils.ThrowingRunnable function,
final String msg,
final AtomicReference<Throwable> firstException) {
try {
function.run();
} catch (TimeoutException e) {
log.debug("Timeout expired before the {} operation could complete.", msg);
firstException.compareAndSet(null, e);
} catch (Exception e) {
log.error(msg, e);
firstException.compareAndSet(null, e);
}
}

@Override
public void wakeup() {
wakeupTrigger.wakeup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, LEAVE_ON_CLOSE
COMMIT_ON_CLOSE
}

private final Type type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -119,10 +118,6 @@ public void process(ApplicationEvent event) {
process((CommitOnCloseEvent) event);
return;

case LEAVE_ON_CLOSE:
process((LeaveOnCloseEvent) event);
return;

default:
log.warn("Application event type " + event.type() + " was not expected");
}
Expand Down Expand Up @@ -268,20 +263,6 @@ private void process(@SuppressWarnings("unused") final CommitOnCloseEvent event)
requestManagers.commitRequestManager.get().signalClose();
}

private void process(final LeaveOnCloseEvent event) {
if (!requestManagers.heartbeatRequestManager.isPresent()) {
event.future().complete(null);
return;
}
MembershipManager membershipManager =
Objects.requireNonNull(requestManagers.heartbeatRequestManager.get().membershipManager(), "Expecting " +
"membership manager to be non-null");
log.debug("Leaving group before closing");
CompletableFuture<Void> future = membershipManager.leaveGroup();
// The future will be completed on heartbeat sent
future.whenComplete(complete(event.future()));
}

private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) {
return (value, exception) -> {
if (exception != null)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
Expand Down Expand Up @@ -108,7 +107,6 @@
import java.util.stream.Stream;

import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
Expand All @@ -131,14 +129,14 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.clearInvocations;
Expand Down Expand Up @@ -240,6 +238,7 @@ private AsyncKafkaConsumer<String, String> newConsumer(
@Test
public void testSuccessfulStartupShutdown() {
consumer = newConsumer();
completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.close());
}

Expand All @@ -252,6 +251,7 @@ public void testInvalidGroupId() {
@Test
public void testFailOnClosedConsumer() {
consumer = newConsumer();
completeUnsubscribeApplicationEventSuccessfully();
consumer.close();
final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::assignment);
assertEquals("This consumer has already been closed.", res.getMessage());
Expand Down Expand Up @@ -799,6 +799,7 @@ public void testEnsurePollExecutedCommitAsyncCallbacks() {
@Test
public void testEnsureShutdownExecutedCommitAsyncCallbacks() {
consumer = newConsumer();
completeUnsubscribeApplicationEventSuccessfully();
MockCommitCallback callback = new MockCommitCallback();
completeCommitAsyncApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
Expand All @@ -810,66 +811,45 @@ public void testEnsureShutdownExecutedCommitAsyncCallbacks() {
@Test
public void testVerifyApplicationEventOnShutdown() {
consumer = newConsumer();
completeUnsubscribeApplicationEventSuccessfully();
doReturn(null).when(applicationEventHandler).addAndGet(any());
consumer.close();
verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class));
verify(applicationEventHandler).add(any(UnsubscribeEvent.class));
verify(applicationEventHandler).add(any(CommitOnCloseEvent.class));
}

@Test
public void testPartitionRevocationOnClose() {
MockRebalanceListener listener = new MockRebalanceListener();
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
consumer = newConsumer(
public void testUnsubscribeOnClose() {
SubscriptionState subscriptions = mock(SubscriptionState.class);
consumer = spy(newConsumer(
mock(FetchBuffer.class),
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");

consumer.subscribe(singleton("topic"), listener);
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
"client-id"));
completeUnsubscribeApplicationEventSuccessfully();
consumer.close(Duration.ZERO);
assertTrue(subscriptions.assignedPartitions().isEmpty());
assertEquals(1, listener.revokedCount);
verifyUnsubscribeEvent(subscriptions);
}

@Test
public void testFailedPartitionRevocationOnClose() {
// If rebalance listener failed to execute during close, we still send the leave group,
// and proceed with closing the consumer.
ConsumerRebalanceListener listener = mock(ConsumerRebalanceListener.class);
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
consumer = newConsumer(
SubscriptionState subscriptions = mock(SubscriptionState.class);
consumer = spy(newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id");
subscriptions.subscribe(singleton("topic"), Optional.of(listener));
TopicPartition tp = new TopicPartition("topic", 0);
subscriptions.assignFromSubscribed(singleton(tp));
doThrow(new KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp)));
"client-id"));
doThrow(new KafkaException()).when(consumer).processBackgroundEvents(any(), any());
assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class));
verify(listener).onPartitionsRevoked(eq(singleton(tp)));
assertEquals(emptySet(), subscriptions.assignedPartitions());
}

@Test
public void testCompleteQuietly() {
AtomicReference<Throwable> exception = new AtomicReference<>();
CompletableFuture<Object> future = CompletableFuture.completedFuture(null);
consumer = newConsumer();
assertDoesNotThrow(() -> consumer.completeQuietly(() -> future.get(0, TimeUnit.MILLISECONDS), "test", exception));
assertNull(exception.get());

assertDoesNotThrow(() -> consumer.completeQuietly(() -> {
throw new KafkaException("Test exception");
}, "test", exception));
assertInstanceOf(KafkaException.class, exception.get());
verifyUnsubscribeEvent(subscriptions);
// Close operation should carry on even if the unsubscribe fails
verify(applicationEventHandler).close(any(Duration.class));
}

@Test
Expand Down Expand Up @@ -1170,7 +1150,7 @@ public void testNoWakeupInCloseCommit() {
}
return null;
}).when(applicationEventHandler).add(any());

completeUnsubscribeApplicationEventSuccessfully();
consumer.close(Duration.ZERO);

// A commit was triggered and not completed exceptionally by the wakeup
Expand Down Expand Up @@ -1208,6 +1188,7 @@ public void testCloseAwaitPendingAsyncCommitComplete() {
completeCommitAsyncApplicationEventSuccessfully();
consumer.commitAsync(cb);

completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.close(Duration.ofMillis(10)));
assertEquals(1, cb.invoked);
}
Expand All @@ -1222,6 +1203,7 @@ public void testInterceptorAutoCommitOnClose() {
consumer = newConsumer(props);
assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
completeCommitSyncApplicationEventSuccessfully();
completeUnsubscribeApplicationEventSuccessfully();

consumer.close(Duration.ZERO);

Expand Down Expand Up @@ -1926,6 +1908,7 @@ public void testPollThrowsInterruptExceptionIfInterrupted() {
@Test
void testReaperInvokedInClose() {
consumer = newConsumer();
completeUnsubscribeApplicationEventSuccessfully();
consumer.close();
verify(backgroundEventReaper).reap(backgroundEventQueue);
}
Expand All @@ -1947,6 +1930,18 @@ void testReaperInvokedInPoll() {
verify(backgroundEventReaper).reap(time.milliseconds());
}

private void verifyUnsubscribeEvent(SubscriptionState subscriptions) {
// Check that an unsubscribe event was generated, and that the consumer waited for it to
// complete processing background events.
verify(applicationEventHandler).add(any(UnsubscribeEvent.class));
verify(consumer).processBackgroundEvents(any(), any());

// The consumer should not clear the assignment in the app thread. The unsubscribe
// event is the one responsible for updating the assignment in the background when it
// completes.
verify(subscriptions, never()).assignFromSubscribed(any());
}

private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
Expand Down
Loading