Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
Expand Down Expand Up @@ -83,7 +85,9 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;

import java.net.InetSocketAddress;
import java.time.Duration;
Expand All @@ -94,10 +98,12 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
Expand All @@ -124,6 +130,7 @@
import static org.apache.kafka.common.utils.Utils.closeQuietly;
import static org.apache.kafka.common.utils.Utils.isBlank;
import static org.apache.kafka.common.utils.Utils.join;
import static org.apache.kafka.common.utils.Utils.swallow;

/**
* This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process
Expand Down Expand Up @@ -245,7 +252,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {

private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private Optional<ConsumerGroupMetadata> groupMetadata;
private Optional<ConsumerGroupMetadata> groupMetadata = Optional.empty();
private final KafkaConsumerMetrics kafkaConsumerMetrics;
private Logger log;
private final String clientId;
Expand All @@ -268,6 +275,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
private final Metrics metrics;
private final long retryBackoffMs;
private final int defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private volatile boolean closed = false;
private final List<ConsumerPartitionAssignor> assignors;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
Expand Down Expand Up @@ -313,6 +321,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
GroupRebalanceConfig.ProtocolType.CONSUMER
);
this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
LogContext logContext = createLogContext(config, groupRebalanceConfig);
this.log = logContext.logger(getClass());

Expand Down Expand Up @@ -434,6 +443,51 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
}

// Visible for testing
AsyncKafkaConsumer(LogContext logContext,
String clientId,
Deserializers<K, V> deserializers,
FetchBuffer fetchBuffer,
FetchCollector<K, V> fetchCollector,
ConsumerInterceptors<K, V> interceptors,
Time time,
ApplicationEventHandler applicationEventHandler,
BlockingQueue<BackgroundEvent> backgroundEventQueue,
ConsumerRebalanceListenerInvoker rebalanceListenerInvoker,
Metrics metrics,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
long retryBackoffMs,
int defaultApiTimeoutMs,
List<ConsumerPartitionAssignor> assignors,
String groupId,
boolean autoCommitEnabled) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = clientId;
this.fetchBuffer = fetchBuffer;
this.fetchCollector = fetchCollector;
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.interceptors = Objects.requireNonNull(interceptors);
this.time = time;
this.backgroundEventProcessor = new BackgroundEventProcessor(
logContext,
backgroundEventQueue,
applicationEventHandler,
rebalanceListenerInvoker
);
this.metrics = metrics;
this.groupMetadata = initializeGroupMetadata(groupId, Optional.empty());
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
}

AsyncKafkaConsumer(LogContext logContext,
Time time,
ConsumerConfig config,
Expand All @@ -446,6 +500,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.fetchBuffer = new FetchBuffer(logContext);
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
Expand Down Expand Up @@ -1159,15 +1214,12 @@ private void close(Duration timeout, boolean swallowException) {
final Timer closeTimer = time.timer(timeout);
clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis()));
closeTimer.update();

// Prepare shutting down the network thread
prepareShutdown(closeTimer, firstException);
closeTimer.update();
if (applicationEventHandler != null)
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException);

// Invoke all callbacks after the background thread exists in case if there are unsent async
// commits
maybeInvokeCommitCallbacks();

closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException);
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
closeTimer.update();
closeQuietly(interceptors, "consumer interceptors", firstException);
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
closeQuietly(metrics, "consumer metrics", firstException);
Expand All @@ -1185,6 +1237,74 @@ private void close(Duration timeout, boolean swallowException) {
}
}

/**
* Prior to closing the network thread, we need to make sure the following operations happen in the right sequence:
* 1. autocommit offsets
* 2. revoke all partitions
* 3. if partition revocation completes successfully, send leave group
* 4. invoke all async commit callbacks if there is any
*/
void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstException) {
if (!groupMetadata.isPresent())
return;
maybeAutoCommitSync(autoCommitEnabled, timer, firstException);
applicationEventHandler.add(new CommitOnCloseApplicationEvent());
completeQuietly(
() -> {
maybeRevokePartitions();
applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer);
},
"Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException);
swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", this::maybeInvokeCommitCallbacks,
firstException);
}

// Visible for testing
void maybeAutoCommitSync(final boolean shouldAutoCommit,
final Timer timer,
final AtomicReference<Throwable> firstException) {
if (!shouldAutoCommit)
return;
Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed();
log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed);
try {
commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage());
}
timer.update();
}

// Visible for testing
void maybeRevokePartitions() {
if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty())
return;
try {
SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(subscriptions.assignedPartitions());
if (subscriptions.rebalanceListener().isPresent())
subscriptions.rebalanceListener().get().onPartitionsRevoked(droppedPartitions);
} catch (Exception e) {
throw new KafkaException(e);
} finally {
subscriptions.assignFromSubscribed(Collections.emptySet());
}
}

// Visible for testing
void completeQuietly(final Utils.ThrowingRunnable function,
final String msg,
final AtomicReference<Throwable> firstException) {
try {
function.run();
} catch (TimeoutException e) {
log.debug("Timeout expired before the {} operation could complete.", msg);
} catch (Exception e) {
firstException.compareAndSet(null, e);
}
}

@Override
public void wakeup() {
wakeupTrigger.wakeup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class CommitRequestManager implements RequestManager {
private final OptionalDouble jitter;
private final boolean throwOnFetchStableOffsetUnsupported;
final PendingRequests pendingRequests;
private boolean closing = false;

public CommitRequestManager(
final Time time,
Expand Down Expand Up @@ -153,6 +154,10 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
if (!coordinatorRequestManager.coordinator().isPresent())
return EMPTY;

if (closing) {
return drainPendingOffsetCommitRequests();
}

maybeAutoCommitAllConsumed();
if (!pendingRequests.hasUnsentRequests())
return EMPTY;
Expand All @@ -165,6 +170,11 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
return new NetworkClientDelegate.PollResult(timeUntilNextPoll, requests);
}

@Override
public void signalClose() {
closing = true;
}

/**
* Returns the delay for which the application thread can safely wait before it should be responsive
* to results from the request managers. For example, the subscription state can change when heartbeats
Expand Down Expand Up @@ -315,12 +325,10 @@ public void resetAutoCommitTimer() {
* Drains the inflight offsetCommits during shutdown because we want to make sure all pending commits are sent
* before closing.
*/
@Override
public NetworkClientDelegate.PollResult pollOnClose() {
if (!pendingRequests.hasUnsentRequests() || !coordinatorRequestManager.coordinator().isPresent())
public NetworkClientDelegate.PollResult drainPendingOffsetCommitRequests() {
if (pendingRequests.unsentOffsetCommits.isEmpty())
return EMPTY;

List<NetworkClientDelegate.UnsentRequest> requests = pendingRequests.drainOnClose();
List<NetworkClientDelegate.UnsentRequest> requests = pendingRequests.drainPendingCommits();
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests);
}

Expand Down Expand Up @@ -785,7 +793,7 @@ private void clearAll() {
unsentOffsetFetches.clear();
}

private List<NetworkClientDelegate.UnsentRequest> drainOnClose() {
private List<NetworkClientDelegate.UnsentRequest> drainPendingCommits() {
ArrayList<NetworkClientDelegate.UnsentRequest> res = new ArrayList<>();
res.addAll(unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList()));
clearAll();
Expand Down
Loading