Skip to content

KAFKA-7109: Close fetch sessions on close of consumer#12590

Merged
dajac merged 6 commits intoapache:trunkfrom
divijvaidya:KAFKA-7109
Feb 9, 2023
Merged

KAFKA-7109: Close fetch sessions on close of consumer#12590
dajac merged 6 commits intoapache:trunkfrom
divijvaidya:KAFKA-7109

Conversation

@divijvaidya
Copy link
Copy Markdown
Member

Problem

When consumer is closed, fetch sessions associated with the consumer should notify the server about it's intention to close using a Fetch call with epoch = -1 (identified by FINAL_EPOCH in FetchMetadata.java). However, we are not sending this final fetch request in the current flow which leads to unnecessary fetch sessions on the server which are closed only after timeout.

Changes

  1. Change close() in Fetcher to add a logic to send the final Fetch request notifying close to the server.
  2. Change close() in Consumer to respect the timeout duration passed to it. Prior to this change, the timeout parameter was being ignored.
  3. Change tests to close with Duration.zero to reduce the execution time of the tests. Otherwise the tests will wait for default timeout to exit (close() in the tests is expected to be unsuccessful because there is no server to send the request to).
  4. Distinguish between the case of "close existing session and create new session" and "close existing session" by renaming the nextCloseExisting function to nextCloseExistingAttemptNew.

Testing

Added unit test which validates that the correct close request is sent to the server.

Note that this change has been attempted in #5407 but the PR was abandoned.

@divijvaidya divijvaidya changed the title KAFKA-7109: Close fetch sessions on close of consumer KAFKA-7109: (draft) Close fetch sessions on close of consumer Sep 5, 2022
@divijvaidya divijvaidya force-pushed the KAFKA-7109 branch 2 times, most recently from 1b7da9d to 62ef313 Compare September 6, 2022 16:40
@divijvaidya divijvaidya changed the title KAFKA-7109: (draft) Close fetch sessions on close of consumer KAFKA-7109: Close fetch sessions on close of consumer Sep 7, 2022
@divijvaidya
Copy link
Copy Markdown
Member Author

Test failures are unrelated. @showuon this is ready for your review.

Test failures:

[2022-09-06T16:49:33.427Z] org.apache.kafka.controller.QuorumControllerTest.testEarlyControllerResults() failed, log available in /home/jenkins/workspace/Kafka_kafka-pr_PR-12590/metadata/build/reports/testOutput/org.apache.kafka.controller.QuorumControllerTest.testEarlyControllerResults().test.stdout
[2022-09-06T16:49:33.427Z] 
[2022-09-06T16:49:33.427Z] QuorumControllerTest > testEarlyControllerResults() FAILED
[2022-09-06T16:49:33.427Z]     org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0
[2022-09-06T16:49:33.427Z]         at app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:740)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 1818 to 1824
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI reviewer

This change is required since we have added new Fetch request as part of the close sequence. Thus, for a graceful close scenario, we need to mimic a server response to that Fetch request using the test utility client.respondFrom. The response might not necessarily come from the coordinator, instead the response will come from the node which was associated with the fetch session (which might be a node other than coordinator e.g. read replica)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI reviewer

It is possible that the node is not part of the cluster any more or the connection to that node has been disconnected. In such scenarios, we don't want to try sending a final fetch request to the server. Note that the node is not necessarily the coordinator and could be another broker (such as read replica). The process of choosing a node to establish a fetch session is determined at https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1197-L1225

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI reviewer

Note that this is a blocking call. It sends a LEAVE_GROUP request and waits for it's completion before proceeding ahead.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding that comment about the blocking nature of the close call to the source for posterity?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I think it also tries blocking commits the offset during the close.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right @philipnee and I believe the code comment above referring "send requests to the server" covers that scenario. Do you want to suggest some action here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, just wanna to clarify. thanks.

@divijvaidya
Copy link
Copy Markdown
Member Author

@dajac since you are working on the consumer client protocol, perhaps you may also be interested in taking a look at this PR?

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya , thanks for the PR. Have a look, and left some comments. I need more time to get familiar with how fetchSession works. I'll review again. Thanks.

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/FetchMetadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/FetchMetadata.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/FetchMetadata.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the person seeing this log supposed to do? If they can't do anything, it should not be a warn.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scenario (when sending a close request to the server fails) should not ideally occur. It is an error condition but we want to ignore the error since the system can recover (the session would be removed on the server when it times out) from it. In such cases of recoverable error, I prefer to add a warn so that the user can identify it as something unexpected that occurred on the system. The action that the user could take will be based on the exception trace printed here (perhaps their auth creds were incorrect?).

For the other case of log.warn a few lines below this, in the upcoming commit, I have added a suggestion to the user to increase the close timeout for KafkaConsumer.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If their credentials are incorrect, they would get other errors somewhere else, right? Generally, warn errors like this tend to generate a bunch of confusion without much benefit. In a distributed system, requests are expected to fail at times. Given that we have lived for years without closing sessions, it seems unnecessary to now warn so aggressively when it fails.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I changed it to info in the latest commit. I still have warn left over in the other log statement (when the close times out). I have added a suggestion to the user in the warn message to increase the timeout. Please review that log statement and let me know whether that needs to change to info too.

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM! Left some comments. Thanks.

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the fix.

@dajac
Copy link
Copy Markdown
Member

dajac commented Sep 20, 2022

Perhaps a naive question but does the fetch request to close the session fetches any records? Or does it just close the session and return?

@showuon
Copy link
Copy Markdown
Member

showuon commented Sep 20, 2022

Perhaps a naive question but does the fetch request to close the session fetches any records? Or does it just close the session and return?

@dajac , good question. When consumer closing, it'll leave group first, and then close fetcher. I thought leaving group will clear the owned partition, but looks like it won't. Maybe we need to update in broker side, to not return records when client is trying to close the session and not create a new one. @divijvaidya , WDYT?

@divijvaidya
Copy link
Copy Markdown
Member Author

divijvaidya commented Sep 20, 2022

does the fetch request to close the session fetches any records

No, because the fetch request's field for topic partitions is set to empty at sessionHandler.newBuilder().build() (line 1963 at Fetcher.java). Also, note that the empty fetch data in the close-fetch-request is asserted in the test at testFetcherCloseClosesFetchSessionsInBroker at assertTrue(builder.fetchData().isEmpty());

On the server side, the server handles a close fetch message by creating a SessionlessFetchContext which will return an empty response if FetchData is empty (see FetchSession.scala line 364)

Maybe we need to update in broker side, to not return records when client is trying to close the session and not create a new one

As explained above, both of these cases are already handled in the server by creation of a SessionlessFetchContext

@showuon @dajac Please let me know if I am missing anything here.

@showuon
Copy link
Copy Markdown
Member

showuon commented Sep 21, 2022

@divijvaidya , you're right, thanks for the update! Sorry, I only checked the broker side implementation that although we created SessionlessFetchContext, it still return the fetched records. I missed the part that we sent the fetch request with "empty" fetchData. So, we're good! Thanks again for the explanation!

@divijvaidya
Copy link
Copy Markdown
Member Author

The failing tests pass locally and are unrelated to this change.

Build / JDK 8 and Scala 2.12 / testReturnRecordsDuringRebalance() – org.apache.kafka.clients.consumer.KafkaConsumerTest
Build / JDK 17 and Scala 2.13 / testCreateTopicsReturnsConfigs(String).quorum=zk – kafka.api.PlaintextAdminIntegrationTest
Build / JDK 17 and Scala 2.13 / testCloseOldestConnection() – org.apache.kafka.common.network.Tls13SelectorTest

Note that testReturnRecordsDuringRebalance has been failing in other PRs too such as #12457 which makes me believe that it is not due to this change.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya Thanks for the clarification. That seems right to me. I took a look at the PR and left a few comments/questions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if the session handler is reused after this is called? Should we add unit tests in FetchSessionHandlerTest to be complete?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionHandler should not be reused here after close because:

  1. We drain all completed fetches before calling close of sessions. Hence, no completed fetches will use session.
  2. Fetcher is only called from the Consumer. Consumer has a single threaded access i.e. while it is processing the close, we don't expect it to poll or call Fetcher.sendFetches, session handler will not be used.
  3. SessionHandler map will be cleared after the close request is sent in the Fetcher.close()
  4. We have ensured that no other thread (e.g. FetchResponse future) can use Fetcher while it is being closed by acquiring a lock on Fetcher (at synchronized (Fetcher.this)) before close starts. This ensures that sessionHandler is not called by anyone before close is complete (which should clear the sessionHandler map).

Is my understanding correct here?

Regarding the test, what kind validation/assertion would you like to see from it? I can't think of a test that might be useful for us here.

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
@divijvaidya
Copy link
Copy Markdown
Member Author

@dajac this is ready for your review, whenever you get a chance

@divijvaidya
Copy link
Copy Markdown
Member Author

@dajac please take a look when you get a chance!

Copy link
Copy Markdown
Contributor

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @divijvaidya!

Thank you for adding clarity in your method naming and in the comments you added (especially in the tests). That in addition to fixing the issue.

Comment thread clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding that comment about the blocking nature of the close call to the source for posterity?

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
@divijvaidya
Copy link
Copy Markdown
Member Author

@kirktrue @dajac please take a look when you get a chance!

@divijvaidya
Copy link
Copy Markdown
Member Author

@dajac please take a look!

@divijvaidya
Copy link
Copy Markdown
Member Author

@ijuma would you please take a look when you get a chance?

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya Please excuse me for the delay on this one. I picked it up again. I left a few comments/questions. I think that it is too late for 3.4 though but let's get it merged in before xmax holidays.

Comment thread clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/requests/FetchMetadata.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we loosing anything by removing all those try resources? It basically means that the consumer is not closed in case of an exception.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. try-with-resources is calling KafkaConsumer.close() which has a timeout set to 30s. These tests will wait for default timeout to exit (close() in the tests is expected to be unsuccessful because there is no server to send the request to) and hence, unnecessarily, increase the execution time of these tests.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the third one not necessary?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, idempotency can be verified by two calls to close(), third one just adds to the total running time of this test.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified this test to verify using deterministic methods.

Comment thread clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java Outdated
Comment on lines 5226 to 5228
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are doing this, could we adopt the new style directly:

new Fetcher(
    a,
    b,
    ....
);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I did not understand this comment. Was your suggestion about the indentation of the next line? I have reverted it to original i.e. 8 space indent.

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I think it also tries blocking commits the offset during the close.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind elaborate on when the time object can be null? maybe when we inject the time object?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The close() could be called from the constructor of this class itself when an exception is thrown before the field this.time could be initialized. I can avoid it by setting the field as the first thing that happens in the constructor but then we could be creating a coupling with the initialization order of fields in the constructor to the close method. Instead, to be on the safer side, a better approach IMO is to handle the null case explicitly.

I have added a comment explaining when it could be null in the upcoming commit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about Objects.requireNotNull(time) in the constructor? So that we could avoid the null handling.

@divijvaidya
Copy link
Copy Markdown
Member Author

@dajac @philipnee please review again (and restart the tests) when you get a chance! Thank you.

Unrelated test failures. UnitTest are successful in my local environment. Failing integration tests are known flaky tests.

Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest
2m 0s
Build / JDK 11 and Scala 2.13 / testPatternSubscriptionWithTopicAndGroupRead(String).quorum=kraft – kafka.api.AuthorizerIntegrationTest
6s
Build / JDK 8 and Scala 2.12 / testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest

@philipnee
Copy link
Copy Markdown
Contributor

Thanks @divijvaidya - I don't have more questions regarding this PR.

@showuon
Copy link
Copy Markdown
Member

showuon commented Feb 8, 2023

@dajac , do you have any other comments?

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya Thanks for the patch and my apologies for the delay on it. I had very limited time for reviews. Overall, the patch LGTM. I left a new nits. We should be able to merge it once they are addressed.

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/utils/LambdaUtils.java Outdated
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya Thanks for the update. I left a few more minor comments.

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
try {
code.run();
} catch (Throwable t) {
log.warn("{} error", what, t);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an error instead of a warn to be consistent with closeQuietly? Moreover, I wonder if we could improve the error message. We would get something like fetcher close error which is not really inline with what we usually log. For instance, closeQuietly would log something like Failed to close fetch.... Do you have any thoughts on this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. Though not all "swallows" will require error level. So, I created a generic logging function based on CoreUtils.swallow and then used error for this specific instance of closing fetcher and coordinator. I have also updated the comment.

Let me know if that look right? Happy to change it further.

Comment thread core/src/test/java/kafka/testkit/KafkaClusterTestKit.java Outdated
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the patch @divijvaidya!

@dajac dajac merged commit e903f2c into apache:trunk Feb 9, 2023
@divijvaidya
Copy link
Copy Markdown
Member Author

Thank you @dajac for your patience through this PR. It took a long time but it would definitely improve consumers! Cheers!

@divijvaidya divijvaidya deleted the KAFKA-7109 branch February 10, 2023 09:46
@dajac
Copy link
Copy Markdown
Member

dajac commented Feb 10, 2023

@divijvaidya Sorry again for the long delay. By the way, I was wondering if we should also do this in the AbstractFetcherThread in order to close sessions used by replication when a broker shuts down. I haven't looked into it but that may be an interesting improvement as well.

@divijvaidya
Copy link
Copy Markdown
Member Author

@divijvaidya Sorry again for the long delay. By the way, I was wondering if we should also do this in the AbstractFetcherThread in order to close sessions used by replication when a broker shuts down. I haven't looked into it but that may be an interesting improvement as well.

Thanks for the suggestion. I will look into it.

guozhangwang pushed a commit that referenced this pull request Feb 17, 2023
#13248)

I noticed this issue when tracing #12590.

StreamThread closes the consumer before changing state to DEAD. If the partition rebalance happens quickly, the other StreamThreads can't change KafkaStream state from REBALANCING to RUNNING since there is a PENDING_SHUTDOWN StreamThread

Reviewers: Guozhang Wang <wangguoz@gmail.com>
@dengziming
Copy link
Copy Markdown
Member

dengziming commented Oct 17, 2023

https://issues.apache.org/jira/browse/KAFKA-15619

Deleted topics will come back again in Apache Spark structured streaming stress test after upgrade Kafka from 3.4.0 to 3.5.0, related ticket is: https://issues.apache.org/jira/browse/SPARK-45529 , the test randomly starts/stops/adds data/add partitions/delete topic/add topic/checks the result in a loop, I finally found that a deleted topic will come back again after some time.

By constantly reseting the head of branch-3.5 and using gradlew install to repackage and rerunning of the stress test, I am basically certain that this submission caused it.

Haven't go through the details of the PR, do you have any ideas @divijvaidya @dajac @showuon ?

@showuon
Copy link
Copy Markdown
Member

showuon commented Oct 17, 2023

@dengziming , let's discuss it in the KAFKA-15619

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants