KAFKA-15696: Refactor closing consumer#14937
Conversation
| @Test | ||
| public void testGroupIdNotNullAndValid() { | ||
| // close the default consumer | ||
| shutDown(); |
There was a problem hiding this comment.
Isn't this anyways going to happen in afterAll?
There was a problem hiding this comment.
The test spins up another consumer so we should shutdown the BeforeEach one.
|
Yes, I think using events is much clearer. @kirktrue do you agree with this approach? Then I'd suggest we close the other PR and continue with this one. |
3207f21 to
2b590bf
Compare
| final AtomicReference<Throwable> firstException) { | ||
| try { | ||
| applicationEventHandler.addAndGet(event, timer); | ||
| } catch (TimeoutException e) { |
There was a problem hiding this comment.
We don't really throw timeout exceptions during closing because if user tries to close with 0 duration then all ops will be timedout. The current implementation just polls, but since we cannot directly polls the client, we need to either wait till the future is completed or times out and keep going.
There was a problem hiding this comment.
Yes, we have issue with timeouts of 0 elsewhere. There's a Jira somewhere to solve it, but it's not been designed/fixed.
There was a problem hiding this comment.
@kirktrue and I discussed the potential tasks for dealing with zero timeout. This needs to be examined perhaps after the preview. So we will spin off a jira ticket for this specific issue.
0158de2 to
a2e3ed1
Compare
|
Hi @kirktrue - I rewrote the previous PR based on your feedback. I thought driving the close via event is a better and clearer pattern, so thanks for the suggestions. Would you have time to take a look at this PR? @lucasbru - Thanks for reviewing the PR - I've decided according to your suggestion to use Kirk's approach to close the consumer. Let me know what do you think. |
| @@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) { | |||
| } | |||
|
|
|||
| void cleanup() { | |||
There was a problem hiding this comment.
really not much to do when shutting down the network thread - we will try one more time to send the unsent and poll the network client to make sure all requests and sent
kirktrue
left a comment
There was a problem hiding this comment.
This is really tricky, @philipnee 😞
I think we need to resolve the behavioral ambiguity around a user invoking close(0) ASAHP.
| } catch (Exception e) { | ||
| log.error("Unexpected error during shutdown. Proceed with closing.", e); | ||
| } finally { | ||
| networkClientDelegate.awaitPendingRequests(timer); |
There was a problem hiding this comment.
Network requests are tied to the CompletableApplicationEvents, right? Can we just rely on the events to wait for their network I/O to complete via the addAndGet() method.?
There was a problem hiding this comment.
Can there be other requests (not tied to the closing application events) that we want to wait for as long as we still have time?
There was a problem hiding this comment.
I think we actually don't need this here because runAtClose is already closing after checking the code. if the timer runs out, then we don't need to poll again. if all request are completed before timer runs out, then we don't need to repoll again.
| case COMMIT: | ||
| log.debug("Sending unsent commit before closing."); | ||
| sendUnsentCommit(); | ||
| event.future().complete(null); |
There was a problem hiding this comment.
This is a bit of a different pattern than our other CompletableApplicationEvents. In the other events, we completed the Future when the response was processed. In these events, we're completing them just after sending off the request. Is that truly what we want?
There was a problem hiding this comment.
Yeah, as long as our timeout did not expire, we probably want to wait for the response, right?
| case PREP_CLOSING: | ||
| processPrepClosingEvent((ConsumerCloseApplicationEvent) event); | ||
| return; | ||
|
|
There was a problem hiding this comment.
Any reason we can't have these as separate types like the other events?
| private void sendUnsentCommit() { | ||
| if (!requestManagers.commitRequestManager.isPresent()) | ||
| return; | ||
| NetworkClientDelegate.PollResult res = requestManagers.commitRequestManager.get().pollOnClose(); | ||
| if (res.unsentRequests.isEmpty()) | ||
| return; | ||
| // NetworkThread will continue to poll the networkClientDelegate | ||
| networkClientDelegate.addAll(res); | ||
| } | ||
|
|
There was a problem hiding this comment.
I'm not quite understanding why this needs to be done as a special case. Why can't we rely on the normal runOnce() invocation to poll() the request managers?
| private final Logger log; | ||
| private final ConsumerMetadata metadata; | ||
| private final RequestManagers requestManagers; | ||
| private final NetworkClientDelegate networkClientDelegate; |
There was a problem hiding this comment.
I'm uncomfortable with introducing the NetworkClientDelegate at this layer. It's centralized in ConsumerNetworkThread for the reason that we can reason on where the various network I/O is performed.
| */ | ||
| package org.apache.kafka.clients.consumer.internals.events; | ||
|
|
||
| public class ConsumerCloseApplicationEvent extends CompletableApplicationEvent<Void> { |
There was a problem hiding this comment.
I'm happy to have a superclass for 'close' events, but having a type and a task gets a bit muddy, doesn't it?
There was a problem hiding this comment.
Why not just have separate event types as per the rest of the codebase?
| final AtomicReference<Throwable> firstException) { | ||
| try { | ||
| applicationEventHandler.addAndGet(event, timer); | ||
| } catch (TimeoutException e) { |
There was a problem hiding this comment.
Yes, we have issue with timeouts of 0 elsewhere. There's a Jira somewhere to solve it, but it's not been designed/fixed.
| // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until | ||
| // all requests have received a response. | ||
| do { | ||
| while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)) { |
There was a problem hiding this comment.
Why are you changing this back?
There was a problem hiding this comment.
if the close timer has expired, should we proceed with closing without sending the request? I'm undecided on this. @kirktrue wdyt?
| return EMPTY; | ||
|
|
||
| List<NetworkClientDelegate.UnsentRequest> requests = pendingRequests.drainOnClose(); | ||
| System.out.print("ddraining + " + requests); |
| } | ||
| } | ||
|
|
||
| private CompletableFuture<Void> onLeavePrepare() { |
There was a problem hiding this comment.
not sure if that is the best name to describe what it does
| } catch (Exception e) { | ||
| log.error("Unexpected error during shutdown. Proceed with closing.", e); | ||
| } finally { | ||
| networkClientDelegate.awaitPendingRequests(timer); |
There was a problem hiding this comment.
Can there be other requests (not tied to the closing application events) that we want to wait for as long as we still have time?
| case COMMIT: | ||
| log.debug("Sending unsent commit before closing."); | ||
| sendUnsentCommit(); | ||
| event.future().complete(null); |
There was a problem hiding this comment.
Yeah, as long as our timeout did not expire, we probably want to wait for the response, right?
| droppedPartitions.addAll(subscriptions.assignedPartitions()); | ||
| if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) | ||
| return CompletableFuture.completedFuture(null); | ||
| // TODO: Invoke rebalanceListener via KAFKA-15276 |
There was a problem hiding this comment.
@kirktrue - I am not 100% sure what is the right way to invoke the listener. Are we returning a completable future? The current implementation blocks on listener invocation, which means where we need to do future.get(forever). If the listener is broken in some way, then we are stuck here.
There was a problem hiding this comment.
Can we merge it without resolving this comment?
| final Timer timer) { | ||
| // These are the optional outgoing requests at the | ||
| List<NetworkClientDelegate.PollResult> pollResults = requestManagers.stream() | ||
| requestManagers.stream() |
There was a problem hiding this comment.
see the comment in the FetchRequestManager
There was a problem hiding this comment.
Can we merge it without resolving this comment?
There was a problem hiding this comment.
This is not a conflict actually - this is just some changes to how fetch request manager closes
| */ | ||
| @Override | ||
| public PollResult pollOnClose() { | ||
| // TODO: move the logic to poll to handle signal close |
There was a problem hiding this comment.
I added a method signalClose() to the interface. I wonder if we should keep letting the network thread poll the network client as usual, until we actually invoke close. This means, close will do very little but check if there are any pending requests.
There was a problem hiding this comment.
Yes, using the normal poll loop sounds like a good idea. We should still probably sendUnsentRequests once when the timeout has passed.
| final String groupId = "consumerGroupA"; | ||
| final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); | ||
| final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>(); | ||
| try (final AsyncKafkaConsumer<String, String> consumer = |
There was a problem hiding this comment.
removing the try to close the consumer with 0 timeout. @lucasbru
There was a problem hiding this comment.
Please be aware that all these changes will be unnecessary/conflicting once we move to mocks, which is rather soon I believe, see:
We will get lots of conflicts and I think whoever has to resolve the conflicts will probably just skip the changes from this PR, because the commit logic in the background thread will not be executed anymore in this test.
There was a problem hiding this comment.
ok makes sense, I can revert these changes.
There was a problem hiding this comment.
I didn't mean to revert the changes, but discard them when we merge with the asyncKafkaConsumer refactoring. Is it possible these changes were required to not run OOM?
One option would be to rebase this PR on the refactoring, which can possibly resolve these OOMs.
| ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, timer); | ||
|
|
||
| // the network is polled during the last state of clean up. | ||
| networkClientDelegate.poll(time.timer(1)); |
There was a problem hiding this comment.
Does kirk need to confirm this change before we can merge the PR?
c0e3780 to
86e6943
Compare
lucasbru
left a comment
There was a problem hiding this comment.
I had a look, seems like we are still iterating on the design. Left some comments
| final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, | ||
| metadata, | ||
| applicationEventQueue, | ||
| applicationEventQueue, |
| logContext, | ||
| metadata, | ||
| applicationEventQueue, | ||
| applicationEventQueue, |
| @@ -178,27 +171,11 @@ static void runAtClose(final Collection<Optional<? extends RequestManager>> requ | |||
| final NetworkClientDelegate networkClientDelegate, | |||
| final Timer timer) { | |||
There was a problem hiding this comment.
Seems timer is now completely unused.
There was a problem hiding this comment.
noted, let's get rid of runAtClose all together in the future PR.
| private void sendUnsentRequests(final Timer timer) { | ||
| // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until | ||
| // all requests have received a response. | ||
| while (!networkClientDelegate.unsentRequests().isEmpty() && timer.notExpired()) { |
There was a problem hiding this comment.
Closing with timeout 0 would mean we don't send any closing requests, right? I think we should poll nevertheless, so we should check the timer at the end.
I think if we'd use the normal poll loop as long as timeout > 0, this function may not need to check the timer anyway, since it's only used if the time ran out and there are still unsent requests.
There was a problem hiding this comment.
sorry, to reiterate on your comment, perhaps your suggestion is: if time has run out, we do client.poll(0) to try to send and process the request one last time. if the time hasn't run out and there are still request to be sent, we continue to poll until all requests are sent and timer runs out. Is this what you meant?
There was a problem hiding this comment.
Well, if all requests are sent, I wouldn't timeout. But otherwise, yes.
| */ | ||
| @Override | ||
| public PollResult pollOnClose() { | ||
| // TODO: move the logic to poll to handle signal close |
There was a problem hiding this comment.
Yes, using the normal poll loop sounds like a good idea. We should still probably sendUnsentRequests once when the timeout has passed.
739bd4f to
afb605d
Compare
lucasbru
left a comment
There was a problem hiding this comment.
Approved from my side with minor comments, but needs to be rebased, approved by Kirk and pass CI.
| closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); | ||
| closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); | ||
| closeTimer.update(); | ||
| // Ensure all async commit callbacks are invoked |
There was a problem hiding this comment.
might not even need this comment.
| droppedPartitions.addAll(subscriptions.assignedPartitions()); | ||
| if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) | ||
| return CompletableFuture.completedFuture(null); | ||
| // TODO: Invoke rebalanceListener via KAFKA-15276 |
There was a problem hiding this comment.
Can we merge it without resolving this comment?
| final Timer timer) { | ||
| // These are the optional outgoing requests at the | ||
| List<NetworkClientDelegate.PollResult> pollResults = requestManagers.stream() | ||
| requestManagers.stream() |
There was a problem hiding this comment.
Can we merge it without resolving this comment?
| ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, timer); | ||
|
|
||
| // the network is polled during the last state of clean up. | ||
| networkClientDelegate.poll(time.timer(1)); |
There was a problem hiding this comment.
Does kirk need to confirm this change before we can merge the PR?
afb605d to
0fc4a0a
Compare
| assertTrue(applicationEventsQueue.isEmpty()); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
removed because they are irrelevant now
kirktrue
left a comment
There was a problem hiding this comment.
Thanks for the PR, @philipnee. Please make sure to file those follow-up Jiras 😉
|
Restarting build as previous was aborted. @philipnee have you run the integration tests on this one? |
|
Sorry @lucasbru - forgot to post the integration results but I did. Let me do that for the record. |
|
CI ran Can you check if this PR is in any way related? https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-14937/runs/26/nodes/11/steps/87/log/?start=0 |
|
@philipnee If tests are flaky in the PR that are also flaky on trunk, I don't think this is blocking the merge given the current state of the CI. The problem is that the builds fail completely, so there is no way to tell whether this PR introduces new problems or not. I think this may also be caused by other changes on trunk. For now, I think we have to restart CI and hope for the situation to improve, or debug the build failures ourselves. I looked into it briefly (checking the last few runs on trunk), and it seemed to me that a lot of failures of Gradle Executors followed the execution of |
|
@philipnee I think part of the reason why things are running particularly bad this weekend might be a change in transaction tests. I proposed a revert here. #15029 I will retrigger the CI for this PR for now. Let's see if we get the revert-PR merged, then we can rebase this PR and hopefully get stable runs again. |
|
Thanks @lucasbru |
d09d3be to
3dc624d
Compare
more clean up clean up clean up fix broken tests clean up refactor based on PR comment clean up
Update ApplicationEventProcessorTest.java
Update MetadataTest.java
d22c9ef to
1786208
Compare
|
hey @lucasbru - Seems like your fix fixed the long running/oom issue with the test. The |
|
Hey @lucasbru - I think your patch fixed the long running/oom issue with the async consumer test. The tests finished within 3.5hr in this commit: 1786208 They are observed sparsely in other builds I've seen. So I'm disabling them in the latest commit: 11a3ae6 |
| List<ConsumerPartitionAssignor> assignors, | ||
| String groupId, | ||
| String clientId) { | ||
| return new AsyncKafkaConsumer<>( |
There was a problem hiding this comment.
@philipnee I removed this on purpose from the test file to use the normal constructor and not just test a mock construction of the class. I'm sure there could have been a better way than to reintroduce the constructor that I remove. However, since we are otherwise not converging with this PR, I am going to merge this. Please consider following up with a PR that removes this constructor again
We drive the consumer closing via events, and rely on the still-lived network thread to complete these operations. This ticket encompasses several different tickets: KAFKA-15696/KAFKA-15548 When closing the consumer, we need to perform a few tasks. And here is the top level overview: We want to keep the network thread alive until we are ready to shut down, i.e., no more requests need to be sent out. To achieve so, I implemented a method, signalClose() to signal the managers to prepare for shutdown. Once we signal the network thread to close, the manager will prepare for the request to be sent out on the next event loop. The network thread can then be closed after issuing these events. The application thread's task is pretty straightforward, 1. Tell the background thread to perform n events and 2. Block on certain events until succeed or the timer runs out. Once all requests are sent out, we close the network thread and other components as usual. Here I outline the changes in detail AsyncKafkaConsumer: Shutdown procedures, and several utility functions to ensure proper exceptions are thrown during shutdown AsyncKafkaConsumerTest: I examine each individual test and fix ones that are blocking for too long or logging errors CommitRequestManager: signalClose() FetchRequestManagerTest: changes due to change in pollOnClose() ApplicationEventProcessor: handle CommitOnClose and LeaveGroupOnClose. Latter, it triggers leaveGroup() which should be completed on the next heartbeat (or we time out on the application thread) Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
We drive the consumer closing via events, and rely on the still-lived network thread to complete these operations. This ticket encompasses several different tickets: KAFKA-15696/KAFKA-15548 When closing the consumer, we need to perform a few tasks. And here is the top level overview: We want to keep the network thread alive until we are ready to shut down, i.e., no more requests need to be sent out. To achieve so, I implemented a method, signalClose() to signal the managers to prepare for shutdown. Once we signal the network thread to close, the manager will prepare for the request to be sent out on the next event loop. The network thread can then be closed after issuing these events. The application thread's task is pretty straightforward, 1. Tell the background thread to perform n events and 2. Block on certain events until succeed or the timer runs out. Once all requests are sent out, we close the network thread and other components as usual. Here I outline the changes in detail AsyncKafkaConsumer: Shutdown procedures, and several utility functions to ensure proper exceptions are thrown during shutdown AsyncKafkaConsumerTest: I examine each individual test and fix ones that are blocking for too long or logging errors CommitRequestManager: signalClose() FetchRequestManagerTest: changes due to change in pollOnClose() ApplicationEventProcessor: handle CommitOnClose and LeaveGroupOnClose. Latter, it triggers leaveGroup() which should be completed on the next heartbeat (or we time out on the application thread) Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
We drive the consumer closing via events, and rely on the still-lived network thread to complete these operations. This ticket encompasses several different tickets: KAFKA-15696/KAFKA-15548 When closing the consumer, we need to perform a few tasks. And here is the top level overview: We want to keep the network thread alive until we are ready to shut down, i.e., no more requests need to be sent out. To achieve so, I implemented a method, signalClose() to signal the managers to prepare for shutdown. Once we signal the network thread to close, the manager will prepare for the request to be sent out on the next event loop. The network thread can then be closed after issuing these events. The application thread's task is pretty straightforward, 1. Tell the background thread to perform n events and 2. Block on certain events until succeed or the timer runs out. Once all requests are sent out, we close the network thread and other components as usual. Here I outline the changes in detail AsyncKafkaConsumer: Shutdown procedures, and several utility functions to ensure proper exceptions are thrown during shutdown AsyncKafkaConsumerTest: I examine each individual test and fix ones that are blocking for too long or logging errors CommitRequestManager: signalClose() FetchRequestManagerTest: changes due to change in pollOnClose() ApplicationEventProcessor: handle CommitOnClose and LeaveGroupOnClose. Latter, it triggers leaveGroup() which should be completed on the next heartbeat (or we time out on the application thread) Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
We drive the consumer closing via events, and rely on the still-lived network thread to complete these operations. This ticket encompasses several different tickets: KAFKA-15696/KAFKA-15548 When closing the consumer, we need to perform a few tasks. And here is the top level overview: We want to keep the network thread alive until we are ready to shut down, i.e., no more requests need to be sent out. To achieve so, I implemented a method, signalClose() to signal the managers to prepare for shutdown. Once we signal the network thread to close, the manager will prepare for the request to be sent out on the next event loop. The network thread can then be closed after issuing these events. The application thread's task is pretty straightforward, 1. Tell the background thread to perform n events and 2. Block on certain events until succeed or the timer runs out. Once all requests are sent out, we close the network thread and other components as usual. Here I outline the changes in detail AsyncKafkaConsumer: Shutdown procedures, and several utility functions to ensure proper exceptions are thrown during shutdown AsyncKafkaConsumerTest: I examine each individual test and fix ones that are blocking for too long or logging errors CommitRequestManager: signalClose() FetchRequestManagerTest: changes due to change in pollOnClose() ApplicationEventProcessor: handle CommitOnClose and LeaveGroupOnClose. Latter, it triggers leaveGroup() which should be completed on the next heartbeat (or we time out on the application thread) Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
We drive the consumer closing via events, and rely on the still-lived network thread to complete these operations. This ticket encompasses several different tickets: KAFKA-15696/KAFKA-15548 When closing the consumer, we need to perform a few tasks. And here is the top level overview: We want to keep the network thread alive until we are ready to shut down, i.e., no more requests need to be sent out. To achieve so, I implemented a method, signalClose() to signal the managers to prepare for shutdown. Once we signal the network thread to close, the manager will prepare for the request to be sent out on the next event loop. The network thread can then be closed after issuing these events. The application thread's task is pretty straightforward, 1. Tell the background thread to perform n events and 2. Block on certain events until succeed or the timer runs out. Once all requests are sent out, we close the network thread and other components as usual. Here I outline the changes in detail AsyncKafkaConsumer: Shutdown procedures, and several utility functions to ensure proper exceptions are thrown during shutdown AsyncKafkaConsumerTest: I examine each individual test and fix ones that are blocking for too long or logging errors CommitRequestManager: signalClose() FetchRequestManagerTest: changes due to change in pollOnClose() ApplicationEventProcessor: handle CommitOnClose and LeaveGroupOnClose. Latter, it triggers leaveGroup() which should be completed on the next heartbeat (or we time out on the application thread) Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
We drive the consumer closing via events, and rely on the still-lived network thread to complete these operations. This ticket encompasses several different tickets: KAFKA-15696/KAFKA-15548 When closing the consumer, we need to perform a few tasks. And here is the top level overview: We want to keep the network thread alive until we are ready to shut down, i.e., no more requests need to be sent out. To achieve so, I implemented a method, signalClose() to signal the managers to prepare for shutdown. Once we signal the network thread to close, the manager will prepare for the request to be sent out on the next event loop. The network thread can then be closed after issuing these events. The application thread's task is pretty straightforward, 1. Tell the background thread to perform n events and 2. Block on certain events until succeed or the timer runs out. Once all requests are sent out, we close the network thread and other components as usual. Here I outline the changes in detail AsyncKafkaConsumer: Shutdown procedures, and several utility functions to ensure proper exceptions are thrown during shutdown AsyncKafkaConsumerTest: I examine each individual test and fix ones that are blocking for too long or logging errors CommitRequestManager: signalClose() FetchRequestManagerTest: changes due to change in pollOnClose() ApplicationEventProcessor: handle CommitOnClose and LeaveGroupOnClose. Latter, it triggers leaveGroup() which should be completed on the next heartbeat (or we time out on the application thread) Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
We drives the consumer closing via events, and rely on the still-alived network thread to complete these operations.
This ticket encompasses several different tickets:
KAFKA-15696/KAFKA-15548
When closing the consumer we need to perform a few tasks. And here is the top level overview:
We want to keep the network thread alive until we are ready to shutdown, i.e., no more requests need to be sent out. To achieve so, I implemented a method,
signalClose()to signal the managers to prepare for shutdown. Once we signal the network thread to close, the manager will prepare for the request to be sent out on the next event loop. The network thread can then be closed after issuing these events. The application thread's task is pretty straightforward, 1. tell the background thread to perform n events and 2. block on certain events until succeed or the timer runs out. Once all requests are sent out, we close the network thread and other components as usual.Here I outline the changes in detail