Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,7 @@ private void close(Duration timeout, boolean swallowException) {
clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis()));
closeTimer.update();
// Prepare shutting down the network thread
prepareShutdown(closeTimer, firstException);
releaseAssignmentAndLeaveGroup(closeTimer, firstException);
closeTimer.update();
swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.",
() -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException);
Expand Down Expand Up @@ -1270,12 +1270,12 @@ private void close(Duration timeout, boolean swallowException) {
* 2. revoke all partitions
* 3. if partition revocation completes successfully, send leave group
*/
void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstException) {
void releaseAssignmentAndLeaveGroup(final Timer timer, final AtomicReference<Throwable> firstException) {
if (!groupMetadata.get().isPresent())
return;

if (autoCommitEnabled)
autoCommitSync(timer);
commitSyncAllConsumed(timer);

applicationEventHandler.add(new CommitOnCloseEvent());
completeQuietly(
Expand All @@ -1287,7 +1287,7 @@ void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstEx
}

// Visible for testing
void autoCommitSync(final Timer timer) {
void commitSyncAllConsumed(final Timer timer) {
Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed();
log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ private void reapExpiredApplicationEvents(long currentTimeMs) {
*/
// Visible for testing
static void runAtClose(final Collection<Optional<? extends RequestManager>> requestManagers,
final NetworkClientDelegate networkClientDelegate,
final Timer timer) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this timer was expected to be used by pollOnClose. It seems to me pollOnClose should take a timeout, so the request managers has the chance to return on time in the future.

For another, the timer should call timer.update() at the end. Otherwise, the later operations (sendUnsentRequests) will get incorrect remaining timeout.

Copy link
Copy Markdown
Member Author

@lianetm lianetm Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, that was probably the thinking behind the timer here, but it ended up unused and that actually seemed sensible to me here (because it's used somewhere else). This is how I see it: pollOnClose is only building requests without blocking on any activity that would need a time boundary (just like poll() doesn't take a timer to limit its execution). It's then the sendUnsentRequests the one that takes the requests created by pollOnClose and performs the operation that needs the timer (and it has it already).

Taking the pollOnClose on the FetchRequestManager as example (actually the only manager where it's used), it only generates a fetch request to close the fetch sessions (no timer needed). Makes sense?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for sharing, and I agree that the usage of pollOnClose you described, so +1 to remove timer :)

final NetworkClientDelegate networkClientDelegate) {
// These are the optional outgoing requests at the
requestManagers.stream()
.filter(Optional::isPresent)
Expand Down Expand Up @@ -300,15 +299,21 @@ private void sendUnsentRequests(final Timer timer) {
networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
timer.update();
} while (timer.notExpired() && networkClientDelegate.hasAnyPendingRequests());

if (networkClientDelegate.hasAnyPendingRequests()) {
log.warn("Close timeout of {} ms expired before the consumer network thread was able " +
"to complete pending requests. Inflight request count: {}, Unsent request count: {}",
timer.timeoutMs(), networkClientDelegate.inflightRequestCount(), networkClientDelegate.unsentRequests().size());
}
}

void cleanup() {
log.trace("Closing the consumer network thread");
Timer timer = time.timer(closeTimeout);
try {
runAtClose(requestManagers.entries(), networkClientDelegate, timer);
runAtClose(requestManagers.entries(), networkClientDelegate);
} catch (Exception e) {
log.error("Unexpected error during shutdown. Proceed with closing.", e);
log.error("Unexpected error during shutdown. Proceed with closing.", e);
} finally {
sendUnsentRequests(timer);
applicationEventReaper.reap(applicationEventQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ Queue<UnsentRequest> unsentRequests() {
return unsentRequests;
}

public int inflightRequestCount() {
return client.inFlightRequestCount();
}

/**
* Check if the node is disconnected and unavailable for immediate reconnection (i.e. if it is in
* reconnect backoff window following the disconnect).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ public void testAutoCommitSyncEnabled() {
consumer.subscribe(singleton("topic"), mock(ConsumerRebalanceListener.class));
subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0)));
subscriptions.seek(new TopicPartition("topic", 0), 100);
consumer.autoCommitSync(time.timer(100));
consumer.commitSyncAllConsumed(time.timer(100));
verify(applicationEventHandler).add(any(SyncCommitEvent.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ public void testFetcherCloseClosesFetchSessionsInBroker() {
// NOTE: by design the FetchRequestManager doesn't perform network I/O internally. That means that calling
// the close() method with a Timer will NOT send out the close session requests on close. The network
// I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we need to run that logic here.
ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, timer);
ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate);
// the network is polled during the last state of clean up.
networkClientDelegate.poll(time.timer(1));
// validate that closing the fetcher has sent a request with final epoch. 2 requests are sent, one for the
Expand Down