Skip to content

KAFKA-16954: fix consumer close to release assignment in background#16343

Merged
lucasbru merged 3 commits intoapache:trunkfrom
lianetm:KAFKA-16954-subscription-fix-close
Jun 17, 2024
Merged

KAFKA-16954: fix consumer close to release assignment in background#16343
lucasbru merged 3 commits intoapache:trunkfrom
lianetm:KAFKA-16954-subscription-fix-close

Conversation

@lianetm
Copy link
Copy Markdown
Member

@lianetm lianetm commented Jun 14, 2024

This PR fixes consumer close to avoid updating the subscription state object in the app thread. Now the close simply triggers an UnsubscribeEvent that is handled in the background to trigger callbacks, clear assignment, and send leave heartbeat. Note that after triggering the event, the unsubscribe will continuously process background events until the event completes, to ensure that it allows for callbacks to run in the app thread.

The logic around what happens if the unsubscribe fails remain unchanged: close will log, keep the first exception and carry on.

It also removes the redundant LeaveOnClose event (it used to do the the exact same thing as the UnsubscribeEvent, both calling membershipMgr.leaveGroup).

@lianetm
Copy link
Copy Markdown
Member Author

lianetm commented Jun 14, 2024

Hey @lucasbru , this is the follow-up fix for the related race around close updating the subscription state in the app thread. PTAL. Thanks!
cc. @dajac

@lianetm
Copy link
Copy Markdown
Member Author

lianetm commented Jun 14, 2024

@jlprat we would need this blocker into 3.8. Note this is not production-ready code since KIP-848 in only going out as Preview in 3.8 (so pushing this fix for a race condition affecting consumer close that we discovered, to make KIP-848 preview better). Makes sense? Thanks!

@jlprat
Copy link
Copy Markdown
Contributor

jlprat commented Jun 17, 2024

Is there any way to have this merged before we start making RCs for 3.8.0?

@lucasbru
Copy link
Copy Markdown
Member

@lianetm The general idea looks good to me, but we have a lot of failing integration tests with this change.

@lianetm
Copy link
Copy Markdown
Member Author

lianetm commented Jun 17, 2024

Hey @lucasbru , solved. Issue was that all those integration tests do close with zero timeout, and in that scenario the legacy logic does not propagate a timeout exception while leaving the group (and I was propagating it here). The legacy logic just triggers the leave and waits if it has time. If it doesn't, it just logs a warn here). So I just did the same on the new consumer. Tried several of the integration test files that were failing and they pass locally now. I'm running them all now while the build here completes. Thanks!

@lianetm
Copy link
Copy Markdown
Member Author

lianetm commented Jun 17, 2024

Hey @lucasbru , build completed with 6 unrelated test failures:

Build / JDK 8 and Scala 2.12 / testReplicateFromLatest() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
Build / JDK 8 and Scala 2.12 / testCleanerThreadShutdown() – kafka.log.remote.RemoteIndexCacheTest
Build / JDK 21 and Scala 2.13 / testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest
Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest
Build / JDK 21 and Scala 2.13 / testAlterSinkConnectorOffsetsOverriddenConsumerGroupId() – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
Build / JDK 21 and Scala 2.13 / testMigrateTopicDeletions [9] Type=ZK, MetadataVersion=4.0-IV0,Security=PLAINTEXT – kafka.zk.ZkMigrationIntegrationTest

Thanks!

Copy link
Copy Markdown
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@lucasbru lucasbru merged commit 6c4e777 into apache:trunk Jun 17, 2024
lianetm added a commit to lianetm/kafka that referenced this pull request Jun 17, 2024
…pache#16343)

This PR fixes consumer close to avoid updating the subscription state object in the app thread. Now the close simply triggers an UnsubscribeEvent that is handled in the background to trigger callbacks, clear assignment, and send leave heartbeat. Note that after triggering the event, the unsubscribe will continuously process background events until the event completes, to ensure that it allows for callbacks to run in the app thread.
The logic around what happens if the unsubscribe fails remain unchanged: close will log, keep the first exception and carry on.

It also removes the redundant LeaveOnClose event (it used to do the the exact same thing as the UnsubscribeEvent, both calling membershipMgr.leaveGroup).

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
lianetm added a commit to lianetm/kafka that referenced this pull request Jun 17, 2024
…pache#16343)

This PR fixes consumer close to avoid updating the subscription state object in the app thread. Now the close simply triggers an UnsubscribeEvent that is handled in the background to trigger callbacks, clear assignment, and send leave heartbeat. Note that after triggering the event, the unsubscribe will continuously process background events until the event completes, to ensure that it allows for callbacks to run in the app thread.
The logic around what happens if the unsubscribe fails remain unchanged: close will log, keep the first exception and carry on.

It also removes the redundant LeaveOnClose event (it used to do the the exact same thing as the UnsubscribeEvent, both calling membershipMgr.leaveGroup).

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
lucasbru pushed a commit that referenced this pull request Jun 18, 2024
…16376)

* MINOR: Improving log for outstanding requests on close and cleanup (#16304)

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>

* KAFKA-16954: fix consumer close to release assignment in background (#16343)

This PR fixes consumer close to avoid updating the subscription state object in the app thread. Now the close simply triggers an UnsubscribeEvent that is handled in the background to trigger callbacks, clear assignment, and send leave heartbeat. Note that after triggering the event, the unsubscribe will continuously process background events until the event completes, to ensure that it allows for callbacks to run in the app thread.
The logic around what happens if the unsubscribe fails remain unchanged: close will log, keep the first exception and carry on.

It also removes the redundant LeaveOnClose event (it used to do the the exact same thing as the UnsubscribeEvent, both calling membershipMgr.leaveGroup).

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jul 4, 2024
…pache#16343)

This PR fixes consumer close to avoid updating the subscription state object in the app thread. Now the close simply triggers an UnsubscribeEvent that is handled in the background to trigger callbacks, clear assignment, and send leave heartbeat. Note that after triggering the event, the unsubscribe will continuously process background events until the event completes, to ensure that it allows for callbacks to run in the app thread.
The logic around what happens if the unsubscribe fails remain unchanged: close will log, keep the first exception and carry on.

It also removes the redundant LeaveOnClose event (it used to do the the exact same thing as the UnsubscribeEvent, both calling membershipMgr.leaveGroup).

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
@lucasbru lucasbru added the ctr Consumer Threading Refactor (KIP-848) label Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ctr Consumer Threading Refactor (KIP-848)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants