Skip to content

KAFKA-15696: Refactor AsyncConsumer close procedure#14920

Closed
philipnee wants to merge 3 commits intoapache:trunkfrom
philipnee:close-on-heartbeat-leave-group
Closed

KAFKA-15696: Refactor AsyncConsumer close procedure#14920
philipnee wants to merge 3 commits intoapache:trunkfrom
philipnee:close-on-heartbeat-leave-group

Conversation

@philipnee
Copy link
Copy Markdown
Contributor

@philipnee philipnee commented Dec 4, 2023

This ticket encompasses several different tickets:
KAFKA-15696/KAFKA-15548

When closing the consumer we need to perform a few tasks

  1. If auto-commit is enabled, send a commitSync and block until completed
  2. Invoke all offsetCommitCallbacks and ensure all inflight commits are sent
  3. Invoke partitionsRevoke callbacks
  4. Unsubscribe from all partitions
  5. LeaveGroup

@philipnee
Copy link
Copy Markdown
Contributor Author

Hi @lucasbru - I intended to open this for review however, I'm having a hard time to get testCommitAsyncLeaderEpochUpdate because it somehow seems to try to send a commit request with -1 epoch in it (as well as several other tests). I thought the intention was to just send an async request, i wonder why is the epoch = -1...

@lucasbru
Copy link
Copy Markdown
Member

lucasbru commented Dec 5, 2023

@philipnee I don't see AsyncKafkaConsumer failing in CI, so not sure. Have you fixed it already? Epoch -1 might happen because we didn't heartbeat yet so we haven't joined the group by the time the commit is requested? The test anyways works because we don't care about the background thread, but, yeay, stuff will be in a weird state when we try to close the consumer. We could make the commit fail if we haven't heartbeated yet. But hard to say because I do not see a test failing.

I'd suggest getting this PR in shape and test the close functionality specifically. If there is something in the existing test setup that needs to change to make the close go through smoothly in the other tests, we can do that (once we see it failing on CI), but ideally we just move the problematic tests out of the spy-based test framework, which should resolve the problem (as no close functionality will be required). I'm working on a PR to move some of those tests out of AsyncKafkaConsumerTest. Feel free to ignore that problem for now and just clean up this PR.

@philipnee
Copy link
Copy Markdown
Contributor Author

Hi @lucasbru - Thanks for the response. unfortunately, the way the mock is setup and how other tests are written, close will have dependencies to the network thread, which means we are actually testing all related components. I'll get the PR in shape and have you review it asap.

@philipnee philipnee force-pushed the close-on-heartbeat-leave-group branch from 7eb1e25 to 4e2a6f6 Compare December 5, 2023 21:14
@philipnee philipnee changed the title KAFKA-00000 Handle consumer close KAFKA-15696: AsyncConsumer close procedure Dec 5, 2023
@philipnee philipnee marked this pull request as ready for review December 5, 2023 21:25
@philipnee philipnee changed the title KAFKA-15696: AsyncConsumer close procedure KAFKA-15696: Refactor AsyncConsumer close procedure Dec 5, 2023
Copy link
Copy Markdown
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @philipnee. I left a few comments/questions.

Comment on lines +950 to +1044
// Ensure all async commit callbacks are invoked
swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback", this::maybeInvokeCommitCallbacks, firstException);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be done before we shut down the network, right?

Comment on lines +969 to +1112
/**
* Prior to closing the network thread, we need to make sure the following operations happen in the right sequence:
* 1. autocommit offsets
* 2. revoke all partitions
*/
private void prepareShutdown(final Timer timer) {
if (!groupMetadata.isPresent())
return;

maybeAutoCommitSync(timer);
timer.update();
if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty())
return;

try {
// If the consumer is in a group, we will pause and revoke all assigned partitions
onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
timer.update();
} catch (Exception e) {
Exception exception = e;
if (e instanceof ExecutionException)
exception = (Exception) e.getCause();
throw new KafkaException("User rebalance callback throws an error", exception);
} finally {
subscriptions.assignFromSubscribed(Collections.emptySet());
}
}

private void maybeAutoCommitSync(final Timer timer) {
if (autoCommitEnabled) {
Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed();
try {
log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed);
commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage());
}
}
}

private CompletableFuture<Void> onLeavePrepare() {
SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(subscriptions.assignedPartitions());
if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
// TODO: Invoke rebalanceListener via KAFKA-15276
return CompletableFuture.completedFuture(null);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would imagine that the leave group process could be mostly performed using an event and a callback execution, like in #14931.

We'd need to submit an event to the background thread (e.g. PrepareCloseEvent) so that the member manager can orchestrate the callback request and the heartbeat request.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I mostly agree with your idea. Though - I think simply firing the callback revocation from the close() should be enough - but I think sending leave-group and closing events as you suggested is a good idea.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several people seem to agree that we should solve this as much as possible via an event. Is the new draft PR going to replace this PR, or should we try to merge this one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to be consistent with Kirk's approach for unsubscribe() in terms of callback invocation - So I took the previous comment back (sorry about the confusion)

}
}

private CompletableFuture<Void> onLeavePrepare() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May miss a bit of context, but I'm not yet sure what this function is achieving. If this is for KAFKA-15276, maybe we can implement this as part of that ticket, because this function is mostly confusing me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to speak to kirk about how he wants to implement the callback invocation.

metadata,
subscriptions,
fetchConfig,
deserializers,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you have slightly different formatting settings than somebody else. I wouldn't do these kinds of whitespace changes unless it's obviously unclean

commitFuture.complete(null);

prepareCommit(Arrays.asList(t1, t0), DEFAULT_GROUP_ID, Errors.NONE);
// TODO: The log shows NPE thrown from the CommitRequestManager, which is caused by the use of mock.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as long as the test isn't failing this is fine, since the problem should go away once we move to mocks

@philipnee philipnee force-pushed the close-on-heartbeat-leave-group branch from 80e6162 to 364782a Compare December 6, 2023 18:17
Update AsyncKafkaConsumer.java

test ensure heartbeat manager is polled

Clean up

Update AsyncKafkaConsumerTest.java

clean up

Update ConsumerNetworkThreadTest.java

leave group on close

stuff
@philipnee philipnee force-pushed the close-on-heartbeat-leave-group branch from 364782a to e22a19f Compare December 6, 2023 18:19
@philipnee
Copy link
Copy Markdown
Contributor Author

Thanks @lucasbru and @kirktrue - i'm closing this one and reopening another one.

@philipnee philipnee closed this Dec 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants