Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.event.Level;

import java.net.InetSocketAddress;
import java.time.Duration;
Expand All @@ -93,10 +94,13 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -119,6 +123,7 @@
import static org.apache.kafka.common.utils.Utils.closeQuietly;
import static org.apache.kafka.common.utils.Utils.isBlank;
import static org.apache.kafka.common.utils.Utils.join;
import static org.apache.kafka.common.utils.Utils.swallow;

/**
* This {@link Consumer} implementation uses an {@link ApplicationEventHandler event handler} to process
Expand Down Expand Up @@ -210,7 +215,7 @@ private void process(final GroupMetadataUpdateEvent event) {

private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private Optional<ConsumerGroupMetadata> groupMetadata;
private Optional<ConsumerGroupMetadata> groupMetadata = Optional.empty();
private final KafkaConsumerMetrics kafkaConsumerMetrics;
private Logger log;
private final String clientId;
Expand All @@ -233,6 +238,7 @@ private void process(final GroupMetadataUpdateEvent event) {
private final Metrics metrics;
private final long retryBackoffMs;
private final int defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private volatile boolean closed = false;
private final List<ConsumerPartitionAssignor> assignors;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
Expand Down Expand Up @@ -264,6 +270,7 @@ private void process(final GroupMetadataUpdateEvent event) {
GroupRebalanceConfig.ProtocolType.CONSUMER
);
this.clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
LogContext logContext = createLogContext(config, groupRebalanceConfig);
this.log = logContext.logger(getClass());

Expand Down Expand Up @@ -378,7 +385,8 @@ private void process(final GroupMetadataUpdateEvent event) {
long retryBackoffMs,
int defaultApiTimeoutMs,
List<ConsumerPartitionAssignor> assignors,
String groupId) {
String groupId,
boolean autoCommitEnabled) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = clientId;
Expand All @@ -398,6 +406,7 @@ private void process(final GroupMetadataUpdateEvent event) {
this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
}

// Visible for testing
Expand All @@ -413,6 +422,7 @@ private void process(final GroupMetadataUpdateEvent event) {
this.log = logContext.logger(getClass());
this.subscriptions = subscriptions;
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.fetchBuffer = new FetchBuffer(logContext);
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
Expand Down Expand Up @@ -1024,15 +1034,14 @@ private void close(Duration timeout, boolean swallowException) {
final Timer closeTimer = time.timer(timeout);
clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis()));
closeTimer.update();

// Prepare shutting down the network thread
swallow(log, Level.ERROR, "Unexpected exception when preparing for shutdown", () -> prepareShutdown(closeTimer), firstException);
closeTimer.update();
if (applicationEventHandler != null)
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException);

// Invoke all callbacks after the background thread exists in case if there are unsent async
// commits
maybeInvokeCommitCallbacks();

closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException);
closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
closeTimer.update();
// Ensure all async commit callbacks are invoked
swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback", this::maybeInvokeCommitCallbacks, firstException);
Comment on lines +1043 to +1044
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be done before we shut down the network, right?

closeQuietly(interceptors, "consumer interceptors", firstException);
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException);
closeQuietly(metrics, "consumer metrics", firstException);
Expand All @@ -1050,6 +1059,57 @@ private void close(Duration timeout, boolean swallowException) {
}
}

/**
* Prior to closing the network thread, we need to make sure the following operations happen in the right sequence:
* 1. autocommit offsets
* 2. revoke all partitions
*/
private void prepareShutdown(final Timer timer) {
if (!groupMetadata.isPresent())
return;

maybeAutoCommitSync(timer);
timer.update();
if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty())
return;

try {
// If the consumer is in a group, we will pause and revoke all assigned partitions
onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
timer.update();
} catch (Exception e) {
Exception exception = e;
if (e instanceof ExecutionException)
exception = (Exception) e.getCause();
throw new KafkaException("User rebalance callback throws an error", exception);
} finally {
subscriptions.assignFromSubscribed(Collections.emptySet());
}
}

private void maybeAutoCommitSync(final Timer timer) {
if (autoCommitEnabled) {
Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed();
try {
log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed);
commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage());
}
}
}

private CompletableFuture<Void> onLeavePrepare() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May miss a bit of context, but I'm not yet sure what this function is achieving. If this is for KAFKA-15276, maybe we can implement this as part of that ticket, because this function is mostly confusing me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to speak to kirk about how he wants to implement the callback invocation.

SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(subscriptions.assignedPartitions());
if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
// TODO: Invoke rebalanceListener via KAFKA-15276
return CompletableFuture.completedFuture(null);
}

Comment on lines +1062 to +1112
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would imagine that the leave group process could be mostly performed using an event and a callback execution, like in #14931.

We'd need to submit an event to the background thread (e.g. PrepareCloseEvent) so that the member manager can orchestrate the callback request and the heartbeat request.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I mostly agree with your idea. Though - I think simply firing the callback revocation from the close() should be enough - but I think sending leave-group and closing events as you suggested is a good idea.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several people seem to agree that we should solve this as much as possible via an event. Is the new draft PR going to replace this PR, or should we try to merge this one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to be consistent with Kirk's approach for unsubscribe() in terms of callback invocation - So I took the previous comment back (sorry about the confusion)

@Override
public void wakeup() {
wakeupTrigger.wakeup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
private ApplicationEventProcessor applicationEventProcessor;
private NetworkClientDelegate networkClientDelegate;
private RequestManagers requestManagers;
private Optional<MembershipManager> membershipManager;
private volatile boolean running;
private final IdempotentCloser closer = new IdempotentCloser();
private volatile Duration closeTimeout = Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS);
Expand Down Expand Up @@ -106,6 +107,7 @@ void initializeResources() {
applicationEventProcessor = applicationEventProcessorSupplier.get();
networkClientDelegate = networkClientDelegateSupplier.get();
requestManagers = requestManagersSupplier.get();
membershipManager = requestManagersSupplier.get().membershipManager;
}

/**
Expand Down Expand Up @@ -143,7 +145,6 @@ void runOnce() {
.map(networkClientDelegate::addAll)
.reduce(MAX_POLL_TIMEOUT_MS, Math::min);
networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);

cachedMaximumTimeToWait = requestManagers.entries().stream()
.filter(Optional::isPresent)
.map(Optional::get)
Expand Down Expand Up @@ -194,11 +195,11 @@ static void runAtClose(final Collection<Optional<? extends RequestManager>> requ

// Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until
// all requests have received a response.
do {
while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)) {
pollWaitTimeMs = Math.min(pollWaitTimeMs, timer.remainingMs());
networkClientDelegate.poll(pollWaitTimeMs, timer.currentTimeMs());
timer.update();
} while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone));
}
}

public boolean isRunning() {
Expand Down Expand Up @@ -274,55 +275,58 @@ private void closeInternal(final Duration timeout) {
}

void cleanup() {
log.trace("Closing the consumer network thread");
Timer timer = time.timer(closeTimeout);
try {
log.trace("Closing the consumer network thread");
Timer timer = time.timer(closeTimeout);
maybeAutocommitOnClose(timer);
prepareForShutdown(timer);
// Send out the unsent commits and try to complete them before timer runs out
runAtClose(requestManagers.entries(), networkClientDelegate, timer);
maybeLeaveGroup(timer);
maybeLeaveGroup();
} catch (Exception e) {
log.error("Unexpected error during shutdown. Proceed with closing.", e);
} finally {
networkClientDelegate.awaitPendingRequests(timer);
closeQuietly(requestManagers, "request managers");
closeQuietly(networkClientDelegate, "network client delegate");
closeQuietly(applicationEventProcessor, "application event processor");
log.debug("Closed the consumer network thread");
}
}

/**
* We need to autocommit before shutting down the consumer. The method needs to first connect to the coordinator
* node to construct the closing requests. Then wait for all closing requests to finish before returning. The
* method is bounded by a closing timer. We will continue closing down the consumer if the requests cannot be
* completed in time.
*/
// Visible for testing
void maybeAutocommitOnClose(final Timer timer) {
private void prepareForShutdown(final Timer timer) {
if (!requestManagers.coordinatorRequestManager.isPresent())
return;

if (!requestManagers.commitRequestManager.isPresent()) {
log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down.");
return;
}

if (!requestManagers.commitRequestManager.get().canAutoCommit()) {
return;
}

// We need a coordinator node for the commit request manager to send the remaining commit requests
ensureCoordinatorReady(timer);
NetworkClientDelegate.UnsentRequest autocommitRequest =
requestManagers.commitRequestManager.get().createCommitAllConsumedRequest();
networkClientDelegate.add(autocommitRequest);
do {
long currentTimeMs = timer.currentTimeMs();
ensureCoordinatorReady(timer);
networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
} while (timer.notExpired() && !autocommitRequest.future().isDone());
}

void maybeLeaveGroup(final Timer timer) {
// TODO: Leave group upon closing the consumer
/**
* Leave the group when the consumer is shutting down.
*/
void maybeLeaveGroup() {
if (!membershipManager.isPresent())
return;

// We do not need to send leave group when the member is unsubscribed or fatal
if (membershipManager.get().shouldSkipHeartbeat())
return;

membershipManager.get().leaveGroupOnClose();
HeartbeatRequestManager hrm = requestManagers.heartbeatRequestManager.orElseThrow(() ->
new IllegalStateException(
"Expecting a GroupHeartbeatRequest but the object was never initialized."));
log.debug("Sending GroupHeartbeatRequest with epoch {} to coordinator {} to leave the group before close",
membershipManager.get().memberEpoch(),
requestManagers.coordinatorRequestManager.get().coordinator());
long nowMs = time.milliseconds();
// Ensure the request gets sent out
networkClientDelegate.addAll(hrm.poll(nowMs));
networkClientDelegate.poll(0, nowMs);
}

private void ensureCoordinatorReady(final Timer timer) {
Expand All @@ -331,7 +335,7 @@ private void ensureCoordinatorReady(final Timer timer) {
}
}

private boolean coordinatorReady() {
boolean coordinatorReady() {
CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.orElseThrow(
() -> new IllegalStateException("CoordinatorRequestManager uninitialized."));
Optional<Node> coordinator = coordinatorRequestManager.coordinator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public interface MembershipManager {
*/
CompletableFuture<Void> leaveGroup();

/**
* Leaving the group when the user closes the consumer.
*/
void leaveGroupOnClose();

/**
* @return True if the member should send heartbeat to the coordinator without waiting for
* the interval.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
/**
* TopicPartition comparator based on topic name and partition id.
*/
private final static TopicPartitionComparator TOPIC_PARTITION_COMPARATOR =
final static TopicPartitionComparator TOPIC_PARTITION_COMPARATOR =
new TopicPartitionComparator();

/**
* TopicIdPartition comparator based on topic name and partition id (ignoring ID while sorting,
* as this is sorted mainly for logging purposes).
*/
private final static TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR =
final static TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR =
new TopicIdPartitionComparator();

/**
Expand Down Expand Up @@ -481,6 +481,28 @@ public CompletableFuture<Void> leaveGroup() {
return leaveResult;
}

/**
* When closing down the consumer. The subscriptions are reset by the application thread therefore we just need to
* transition the state to LEAVING and set the epoch to -1/-2.
*/
@Override
public void leaveGroupOnClose() {
if (state == MemberState.UNSUBSCRIBED ||
state == MemberState.FATAL ||
state == MemberState.LEAVING) {
return;
}

if (state == MemberState.PREPARE_LEAVING) {
transitionToSendingLeaveGroup();
return;
}

transitionTo(MemberState.PREPARE_LEAVING);
transitionToSendingLeaveGroup();
leaveGroupInProgress = Optional.of(CompletableFuture.completedFuture(null));
}

/**
* Release member assignment by calling the user defined callbacks for onPartitionsRevoked or
* onPartitionsLost.
Expand Down
Loading