Skip to content

KAKFA-8950: Fix KafkaConsumer Fetcher breaking on concurrent disconnect#7511

Merged
rajinisivaram merged 3 commits intoapache:trunkfrom
wiggzz:kafka-8950-fix-kafkaconsumer-stops-fetching
Oct 17, 2019
Merged

KAKFA-8950: Fix KafkaConsumer Fetcher breaking on concurrent disconnect#7511
rajinisivaram merged 3 commits intoapache:trunkfrom
wiggzz:kafka-8950-fix-kafkaconsumer-stops-fetching

Conversation

@wiggzz
Copy link
Copy Markdown
Contributor

@wiggzz wiggzz commented Oct 14, 2019

Background

The KafkaConsumer Fetcher can sometimes get into an invalid state where it believes that there are ongoing fetch requests, but in fact there are none.

This leads to the Consumer continuing to long poll as if there are no messages available from the broker, and continuing to heartbeat and stay a part of the consumer group, but never receiving any new information from the Broker about the state of the partition or any new messages.

This may be caused by the heartbeat thread concurrently handling a disconnection event just after the fetcher thread submits a request which would cause the Fetcher to enter an invalid state where it believes it has ongoing requests to the disconnected node but in fact it does not. This is due to a thread safety issue in the Fetcher where it was possible for the ordering of the modifications to the nodesWithPendingFetchRequests to be incorrect - the Fetcher was adding it after the listener had already been invoked, which would mean that pending node never gets removed again.

Changes

This PR addresses that thread safety issue by ensuring that the pending node is added to the nodesWithPendingFetchRequests before the listener is added to the future, ensuring the finally block is called after the node is added.

Tests

I added a unit test which simulates the concurrent disconnect. The test validates that after the disconnections, the Fetcher is still able to send a request. In the case where we hit the threading issue, sendFetches will return 0 because it still thinks there are pending requests even though there are not. I modified the MockClient to add a new wakeupHook that gets invoked (if present) every time client.wakeup is called. This allows us to simulate another thread's actions during the ConsumerNetworkClient#send. Happy to find other ways of simulating the threading problems.

Thanks to @thomaslee for collaborating on the solution for this.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

@thomaslee thomaslee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the speedy turn-around on this @wiggzz, we'll be giving it a shot as soon as we're able.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if the subtleties of this fairly benign-looking code warrant a comment here or in the test?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was hoping the test would catch the issues with it - but I guess it's possible in the future some change causes the test to be less useful. Could stick a comment to the effect of "ensure nodesWithPendingFetchRequests is always present before finalizer could be called" or something.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was also wondering if there could ever be an exception thrown by addListener which would cause the listener to not be added or the completion handler to not be called?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was also wondering if there could ever be an exception thrown by addListener which would cause the listener to not be added or the completion handler to not be called?

Hm good question ... find it hard to imagine as implemented unless we end up with multiple listeners executing on the consumer thread & a listener that precedes this one throws or something along those lines. And in that scenario right now I think we'd expect the exception to bubble out of KafkaConsumer.poll(), which would at least give us a clear signal that something went terribly wrong.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We rely on addListener always resuling in listener being invoked in various places. So the code should be ok. But a comment that explains the sequence and exception scenarios would be useful.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I've added a comment describing the rationale behind the sequence. If this looks good, what are the next steps for getting this merged?

@wiggzz wiggzz force-pushed the kafka-8950-fix-kafkaconsumer-stops-fetching branch from 5c925d5 to b816ed1 Compare October 14, 2019 20:47
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is moved above to https://github.com/apache/kafka/pull/7511/files#diff-b45245913eaae46aa847d2615d62cde0R262 the rest of the changes in this file are indentation changes.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(@thomaslee reminds me that send() can throw and then we're left with a dangling reference)

@wiggzz wiggzz force-pushed the kafka-8950-fix-kafkaconsumer-stops-fetching branch from 4687e8b to 2b1bca7 Compare October 15, 2019 17:07
@wiggzz
Copy link
Copy Markdown
Contributor Author

wiggzz commented Oct 15, 2019

retest this please

@thomaslee
Copy link
Copy Markdown
Contributor

FYI we're in the process of canarying a custom build of 2.3.0 with this change so should be able to say for sure whether we're having a better time with it soonish, but I think the sooner this lands on trunk the better.

@wiggzz wiggzz force-pushed the kafka-8950-fix-kafkaconsumer-stops-fetching branch from 2b1bca7 to 3bd1bd3 Compare October 16, 2019 14:21
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the fix!

@hachikuji
Copy link
Copy Markdown
Contributor

retest this please

Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wiggzz Thanks for the update, LGTM

@rajinisivaram
Copy link
Copy Markdown
Contributor

Test failures are unrelated and there was a good build before the PR that added comments, so merging to trunk, 2.4 and 2.3.

@rajinisivaram rajinisivaram merged commit 2cf7b35 into apache:trunk Oct 17, 2019
rajinisivaram pushed a commit that referenced this pull request Oct 17, 2019
…ct (#7511)

The KafkaConsumer Fetcher can sometimes get into an invalid state where it believes that there are ongoing fetch requests, but in fact there are none. This may be caused by the heartbeat thread concurrently handling a disconnection event just after the fetcher thread submits a request which would cause the Fetcher to enter an invalid state where it believes it has ongoing requests to the disconnected node but in fact it does not. This is due to a thread safety issue in the Fetcher where it was possible for the ordering of the modifications to the nodesWithPendingFetchRequests to be incorrect - the Fetcher was adding it after the listener had already been invoked, which would mean that pending node never gets removed again.

This PR addresses that thread safety issue by ensuring that the pending node is added to the nodesWithPendingFetchRequests before the listener is added to the future, ensuring the finally block is called after the node is added.

Reviewers: Tom Lee, Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
rajinisivaram pushed a commit that referenced this pull request Oct 17, 2019
…ct (#7511)

The KafkaConsumer Fetcher can sometimes get into an invalid state where it believes that there are ongoing fetch requests, but in fact there are none. This may be caused by the heartbeat thread concurrently handling a disconnection event just after the fetcher thread submits a request which would cause the Fetcher to enter an invalid state where it believes it has ongoing requests to the disconnected node but in fact it does not. This is due to a thread safety issue in the Fetcher where it was possible for the ordering of the modifications to the nodesWithPendingFetchRequests to be incorrect - the Fetcher was adding it after the listener had already been invoked, which would mean that pending node never gets removed again.

This PR addresses that thread safety issue by ensuring that the pending node is added to the nodesWithPendingFetchRequests before the listener is added to the future, ensuring the finally block is called after the node is added.

Reviewers: Tom Lee, Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
satishd pushed a commit to harshach/kafka that referenced this pull request Oct 18, 2019
…ct (apache#7511)

The KafkaConsumer Fetcher can sometimes get into an invalid state where it believes that there are ongoing fetch requests, but in fact there are none. This may be caused by the heartbeat thread concurrently handling a disconnection event just after the fetcher thread submits a request which would cause the Fetcher to enter an invalid state where it believes it has ongoing requests to the disconnected node but in fact it does not. This is due to a thread safety issue in the Fetcher where it was possible for the ordering of the modifications to the nodesWithPendingFetchRequests to be incorrect - the Fetcher was adding it after the listener had already been invoked, which would mean that pending node never gets removed again.

This PR addresses that thread safety issue by ensuring that the pending node is added to the nodesWithPendingFetchRequests before the listener is added to the future, ensuring the finally block is called after the node is added.

Reviewers: Tom Lee, Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
@thomaslee
Copy link
Copy Markdown
Contributor

@wiggzz @rajinisivaram fyi we've been running this patch for several days now in multiple services & happy to report no stuck consumers since. Thanks a ton for the speedy turn-around on all this. Looking forward to 2.3.1.

sdandu-gh added a commit to confluentinc/kafka that referenced this pull request Nov 9, 2019
* KAFKA-8649: send latest commonly supported version in assignment (apache#7425)

Reviewers: Matthias J. Sax <matthias@confluent.io>

* KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (apache#6922) (apache#7457)

- Timeout occurred due to initial slow rebalancing.
- Added code to wait until `KafkaStreams` instance is in state RUNNING to check registration of metrics and in state NOT_RUNNING to check deregistration of metrics.
- I removed all other wait conditions, because they are not needed if `KafkaStreams` instance is in the right state.

Reviewers: Guozhang Wang <wangguoz@gmail.com>

* MINOR: clarify wording around fault-tolerant state stores (apache#7510)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>

* MINOR: Port changes from trunk for test stability to 2.3 branch (apache#7424)

Reviewer: Matthias J. Sax <matthias@confluent.io>

* KAFKA-9014: Fix AssertionError when SourceTask.poll returns an empty list (apache#7491)

Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>

* KAFKA-8945/KAFKA-8947: Fix bugs in Connect REST extension API (apache#7392)

Fix bug in Connect REST extension API caused by invalid constructor parameter validation, and update integration test to play nicely with Jenkins

Fix instantiation of TaskState objects by Connect framework.

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Magesh Nandakumar <mageshn@confluent.io>, Randall Hauch <rhauch@gmail.com>

* KAFKA-8340, KAFKA-8819: Use PluginClassLoader while statically initializing plugins (apache#7315)

Added plugin isolation unit tests for various scenarios, with a `TestPlugins` class that compiles and builds multiple test plugins without them being on the classpath and verifies that the Plugins and DelegatingClassLoader behave properly. These initially failed for several cases, but now pass since the issues have been fixed.

KAFKA-8340 and KAFKA-8819 are closely related, and this fix corrects the problems reported in both issues.

Author: Greg Harris <gregh@confluent.io>
Reviewers: Chris Egerton <chrise@confluent.io>, Magesh Nandakumar <mageshn@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>

* MINOR: log reason for fatal error in locking state dir (apache#7534)

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>

* KAKFA-8950: Fix KafkaConsumer Fetcher breaking on concurrent disconnect (apache#7511)

The KafkaConsumer Fetcher can sometimes get into an invalid state where it believes that there are ongoing fetch requests, but in fact there are none. This may be caused by the heartbeat thread concurrently handling a disconnection event just after the fetcher thread submits a request which would cause the Fetcher to enter an invalid state where it believes it has ongoing requests to the disconnected node but in fact it does not. This is due to a thread safety issue in the Fetcher where it was possible for the ordering of the modifications to the nodesWithPendingFetchRequests to be incorrect - the Fetcher was adding it after the listener had already been invoked, which would mean that pending node never gets removed again.

This PR addresses that thread safety issue by ensuring that the pending node is added to the nodesWithPendingFetchRequests before the listener is added to the future, ensuring the finally block is called after the node is added.

Reviewers: Tom Lee, Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>

* Fix bug in AssignmentInfo#encode and add additional logging (apache#7545)

Same as apache#7537
but targeted at 2.3 for cherry-pick
Reviewers: Bill Bejeck <bbejeck@gmail.com>

* Bump version to 2.3.1

* Update versions to 2.3.2-SNAPSHOT

* HOTFIX: fix bug in VP test where it greps for the wrong log message (apache#7643)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
radai-rosenblatt added a commit to linkedin/kafka that referenced this pull request Apr 15, 2020
…tching (#79)

TICKET=KAFKA-8950

   EXIT_CRITERIA = when code is rebased on top of 2.4+ to get the upstream fix

   below is the original commit from commit c8676c9:
KAKFA-8950: Fix KafkaConsumer Fetcher breaking on concurrent disconnect (apache#7511)

The KafkaConsumer Fetcher can sometimes get into an invalid state where it believes that there are ongoing fetch requests, but in fact there are none. This may be caused by the heartbeat thread concurrently handling a disconnection event just after the fetcher thread submits a request which would cause the Fetcher to enter an invalid state where it believes it has ongoing requests to the disconnected node but in fact it does not. This is due to a thread safety issue in the Fetcher where it was possible for the ordering of the modifications to the nodesWithPendingFetchRequests to be incorrect - the Fetcher was adding it after the listener had already been invoked, which would mean that pending node never gets removed again.

This PR addresses that thread safety issue by ensuring that the pending node is added to the nodesWithPendingFetchRequests before the listener is added to the future, ensuring the finally block is called after the node is added.

Reviewers: Tom Lee, Jason Gustafson <jason@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>

Co-authored-by: Will James <jameswt@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants