Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
Expand Down Expand Up @@ -90,7 +89,6 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;

Expand All @@ -109,7 +107,6 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -1233,8 +1230,8 @@ private void close(Duration timeout, boolean swallowException) {
clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis()));
closeTimer.update();
// Prepare shutting down the network thread
prepareShutdown(closeTimer, firstException);
closeTimer.update();
swallow(log, Level.ERROR, "Failed to release assignment before closing consumer",
() -> releaseAssignmentAndLeaveGroup(closeTimer), firstException);
swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.",
() -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException);
if (applicationEventHandler != null)
Expand Down Expand Up @@ -1266,25 +1263,34 @@ private void close(Duration timeout, boolean swallowException) {
/**
* Prior to closing the network thread, we need to make sure the following operations happen in the right sequence:
* 1. autocommit offsets
* 2. revoke all partitions
* 3. if partition revocation completes successfully, send leave group
* 2. release assignment. This is done via a background unsubscribe event that will
* trigger the callbacks, clear the assignment on the subscription state and send the leave group request to the broker
*/
void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstException) {
private void releaseAssignmentAndLeaveGroup(final Timer timer) {
if (!groupMetadata.get().isPresent())
return;

if (autoCommitEnabled)
autoCommitSync(timer);
commitSyncAllConsumed(timer);

applicationEventHandler.add(new CommitOnCloseEvent());
completeQuietly(() -> maybeRevokePartitions(),
"Failed to execute callback to release assignment", firstException);
completeQuietly(() -> applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer))),
"Failed to send leave group heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException);

log.info("Releasing assignment and leaving group before closing consumer");
UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer));
applicationEventHandler.add(unsubscribeEvent);
try {
processBackgroundEvents(unsubscribeEvent.future(), timer);
log.info("Completed releasing assignment and sending leave group to close consumer");
} catch (TimeoutException e) {
log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " +
"complete it within {} ms. It will proceed to close.", timer.timeoutMs());
} finally {
timer.update();
}
}

// Visible for testing
void autoCommitSync(final Timer timer) {
void commitSyncAllConsumed(final Timer timer) {
Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed();
log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed);
try {
Expand All @@ -1296,37 +1302,6 @@ void autoCommitSync(final Timer timer) {
timer.update();
}

// Visible for testing
void maybeRevokePartitions() {
if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty())
return;
try {
SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(subscriptions.assignedPartitions());
if (subscriptions.rebalanceListener().isPresent())
subscriptions.rebalanceListener().get().onPartitionsRevoked(droppedPartitions);
} catch (Exception e) {
throw new KafkaException(e);
} finally {
subscriptions.assignFromSubscribed(Collections.emptySet());
}
}

// Visible for testing
void completeQuietly(final Utils.ThrowingRunnable function,
final String msg,
final AtomicReference<Throwable> firstException) {
try {
function.run();
} catch (TimeoutException e) {
log.debug("Timeout expired before the {} operation could complete.", msg);
firstException.compareAndSet(null, e);
} catch (Exception e) {
log.error(msg, e);
firstException.compareAndSet(null, e);
}
}

@Override
public void wakeup() {
wakeupTrigger.wakeup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ private void reapExpiredApplicationEvents(long currentTimeMs) {
*/
// Visible for testing
static void runAtClose(final Collection<Optional<? extends RequestManager>> requestManagers,
final NetworkClientDelegate networkClientDelegate,
final Timer timer) {
final NetworkClientDelegate networkClientDelegate) {
// These are the optional outgoing requests at the
requestManagers.stream()
.filter(Optional::isPresent)
Expand Down Expand Up @@ -300,15 +299,21 @@ private void sendUnsentRequests(final Timer timer) {
networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
timer.update();
} while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests());

if (networkClientDelegate.hasAnyPendingRequests()) {
log.warn("Close timeout of {} ms expired before the consumer network thread was able " +
"to complete pending requests. Inflight request count: {}, Unsent request count: {}",
timer.timeoutMs(), networkClientDelegate.inflightRequestCount(), networkClientDelegate.unsentRequests().size());
}
}

void cleanup() {
log.trace("Closing the consumer network thread");
Timer timer = time.timer(closeTimeout);
try {
runAtClose(requestManagers.entries(), networkClientDelegate, timer);
runAtClose(requestManagers.entries(), networkClientDelegate);
} catch (Exception e) {
log.error("Unexpected error during shutdown. Proceed with closing.", e);
log.error("Unexpected error during shutdown. Proceed with closing.", e);
} finally {
sendUnsentRequests(timer);
applicationEventReaper.reap(applicationEventQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ Queue<UnsentRequest> unsentRequests() {
return unsentRequests;
}

public int inflightRequestCount() {
return client.inFlightRequestCount();
}

/**
* Check if the node is disconnected and unavailable for immediate reconnection (i.e. if it is in
* reconnect backoff window following the disconnect).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, LEAVE_ON_CLOSE
COMMIT_ON_CLOSE
}

private final Type type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -119,10 +118,6 @@ public void process(ApplicationEvent event) {
process((CommitOnCloseEvent) event);
return;

case LEAVE_ON_CLOSE:
process((LeaveOnCloseEvent) event);
return;

default:
log.warn("Application event type " + event.type() + " was not expected");
}
Expand Down Expand Up @@ -268,20 +263,6 @@ private void process(@SuppressWarnings("unused") final CommitOnCloseEvent event)
requestManagers.commitRequestManager.get().signalClose();
}

private void process(final LeaveOnCloseEvent event) {
if (!requestManagers.heartbeatRequestManager.isPresent()) {
event.future().complete(null);
return;
}
MembershipManager membershipManager =
Objects.requireNonNull(requestManagers.heartbeatRequestManager.get().membershipManager(), "Expecting " +
"membership manager to be non-null");
log.debug("Leaving group before closing");
CompletableFuture<Void> future = membershipManager.leaveGroup();
// The future will be completed on heartbeat sent
future.whenComplete(complete(event.future()));
}

private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) {
return (value, exception) -> {
if (exception != null)
Expand Down

This file was deleted.

Loading