Fix #857: Paused consumption after rebalance with multiple consumers#29
Fix #857: Paused consumption after rebalance with multiple consumers#29astubbs wants to merge 12 commits intomaster-confluentfrom
Conversation
|
@astubbs, is there any way to get a SNAPSHOT version so we could test? Or perhaps you will release a new version? |
Hi, yup! I’m going to try get the build pipeline on my fork to start releasing tomorrow, everything’s in place I just need to wrap it all up. I will start releasing snapshots, and release a full new release too. |
c054ca9 to
a6ce9bf
Compare
Dependency ReviewThe following issues were found:
License Issues.github/workflows/performance.yml
.github/workflows/publish.yml
parallel-consumer-examples/parallel-consumer-example-core/pom.xml
parallel-consumer-examples/parallel-consumer-example-metrics/pom.xml
parallel-consumer-examples/parallel-consumer-example-reactor/pom.xml
parallel-consumer-examples/parallel-consumer-example-streams/pom.xml
parallel-consumer-examples/parallel-consumer-example-vertx/pom.xml
parallel-consumer-examples/pom.xml
parallel-consumer-mutiny/pom.xml
parallel-consumer-reactor/pom.xml
parallel-consumer-vertx/pom.xml
pom.xml
OpenSSF ScorecardScorecard details
Scanned Files
|
✅ Duplicate Code ReportTwo engines run in parallel for cross-validation. Each has its own thresholds tuned to its baseline - the real safety net is the per-engine "max increase vs base" check. ✅ PMD CPD
|
| PR | Base | Change | |
|---|---|---|---|
| Clones | 76 | 78 | 🙂 -2 |
| Duplicated lines | 1098 | 1127 | ❤️ -29 |
| Duplication | 3.70% | 3.85% | 👍 -0.15% |
| Rule | Limit | Status |
|---|---|---|
| Max duplication | 4% | ✅ Pass (3.70%) |
| Max increase vs base | +0.1% | ✅ Pass (-0.15%) |
⚠️ 8 new clones introduced
- 17 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMapRaceTest.java:37<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ShardManagerStaleContainerTest.java:35 - 15 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java:1<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java:1 - 14 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java:64<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java:80 - 15 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithCommitTimeoutException.java:121<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java:121 - 21 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTest.java:51<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java:97 - 12 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTest.java:72<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java:121 - 16 lines:
parallel-consumer-reactor/pom.xml:16<->parallel-consumer-vertx/pom.xml:21 - 18 lines:
parallel-consumer-mutiny/pom.xml:20<->parallel-consumer-vertx/pom.xml:20
Powered by astubbs/duplicate-code-cross-check
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master-confluent #29 +/- ##
===================================================
Coverage ? 77.86%
Complexity ? 1201
===================================================
Files ? 83
Lines ? 4333
Branches ? 392
===================================================
Hits ? 3374
Misses ? 769
Partials ? 190
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
❌ Mutation Testing (PIT) ReportPIT did not produce a report. Most commonly this means a test failed in the baseline (PIT runs all tests unmodified first to establish green) and PIT aborted before mutating. See the "Run PIT mutation testing" step logs for the failing test, then either fix it or add it to |
Republish under new Maven coordinates so the fork can be released to Maven Central independently of upstream confluentinc/parallel-consumer (which is no longer actively maintained). Changes: - groupId: io.confluent.parallelconsumer -> io.github.astubbs.parallelconsumer - version: 0.5.3.4-SNAPSHOT -> 0.6.0.0-SNAPSHOT - organization: Confluent, Inc. -> Antony Stubbs - SCM URLs: confluentinc -> astubbs - developer: antony.stubbs@gmail.com - Display names: drop "Confluent" prefix - License header template: now "Confluent, Inc. and contributors" (preserves original Apache 2.0 attribution while crediting fork contributors going forward) - Add NOTICE file per Apache 2.0 §4(d) crediting Confluent as the original author - README updated with new coordinates and a note explaining the fork Java packages (io.confluent.parallelconsumer.*) and artifactIds are intentionally unchanged so cherry-picking PRs back upstream stays clean if they ever resume maintenance. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…cture Replace Confluent's Semaphore CI with GitHub Actions. Includes: - Parallel PR builds (unit/integration/performance) with fail-fast - Quality gates: PMD CPD duplicate detection, file similarity check, SpotBugs, dependency vulnerability scanning, PIT mutation testing - Maven cache with rotating save key to prevent stale-cache trap - Maven HTTP timeouts (.mvn/maven.config) for connection reliability - Codecov coverage tracking with drop prevention - Claude Code Review and PR dependency check workflows - Performance test infrastructure with self-hosted runner support - Build scripts (bin/ci-build.sh, ci-unit-test.sh, etc.) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…IT stall - EpochAndRecordsMap: skip records for partitions with no epoch assigned yet (race between poll and onPartitionsAssigned) - VertxConcurrencyIT: increase wait timeout and add CountDownLatch protection to prevent CI stalls Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ditions Root-caused PIT's ~85s baseline cliff: MockConsumerTestWithEarlyClose and CommitTimeoutException leaked non-daemon threads that threw IllegalStateException on closed MockConsumer during the next test. Fixes: - MockConsumerTestWithEarlyClose/CommitTimeoutException: daemon threads, interrupt in finally, catch IllegalStateException in addRecords loop - MockConsumerTestWithSaslAuthenticationException: scope Awaitility timeout locally, add @AfterEach close, shrink mock-failure window - ParallelEoSSStreamProcessorRebalancedTest: remove empty close() override hiding base-class cleanup - ProducerManagerTest: @AfterEach Awaitility.reset(), explicit timeouts - PCMetricsTest: explicit atMost(120s) on bare await() calls - PartitionOrderProcessingTest: merge await + assertion to fix race - Base class: Awaitility.reset() in @AfterEach as general guard - New EpochAndRecordsMapRaceTest for null-epoch guard Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…docs - Rename from "Confluent Parallel Consumer" to "Kafka Parallel Consumer" - Point issue tracker to astubbs/parallel-consumer, remove Confluent Slack link - Add upstream PR analysis report - Add CHANGELOG 0.6.0.0 section - Fix AGENTS.md CI description, add build scripts and copyright rules - Add Documented Solutions section to AGENTS.md - Add solution docs: copyright header rules, CI security hardening - Remove duplicate Releasing section in README_TEMPLATE.adoc - Add multi-partition partial-skip test to EpochAndRecordsMapRaceTest - Fix deriveCpKafkaImage Javadoc comment Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Switch check-dependencies.yml from pull_request_target to pull_request to prevent fork PRs running with base repo write permissions - Pin all three custom actions to commit SHAs instead of mutable branch refs (dependencies-action@a09974c, duplicate-code-cross-check@d3140ef, duplicate-code-detection-tool@4e302e7) - Gate publish.yml on CI success via workflow_run trigger instead of deploying on every master push regardless of test results - Add try/catch and pre-release suffix stripping to deriveCpKafkaImage() so unexpected version strings fall back gracefully instead of crashing all integration tests with ExceptionInInitializerError - Add Documented Solutions section to AGENTS.md for discoverability - Add copyright header management solution doc Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…epare-deps cache warming PR builds now run two tiers in parallel: - Split suites (unit, integration, performance) on default Kafka 3.9.1 - Experimental Kafka 4.x compatibility check (non-blocking) Push-to-master runs a single ci-build.sh (default Kafka version) to gate SNAPSHOT publishing. No more version matrix on master. All jobs now use explicit cache/restore with rotating keys from prepare-deps. Eliminated all setup-java cache: 'maven' usage which caused the frozen-cache bug (immutable keys could never overwrite an incomplete cache entry). prepare-deps now runs on push builds too, so master hits the reactor with a warm Maven Central cache for the vertx dependencies that previously caused consistent 120s timeouts. Also skip unit tests in performance-test.sh so a flaky unit test can't cascade-cancel the other PR matrix jobs via fail-fast. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…plugin
Sets up the fork to publish both snapshot and release artifacts to Maven
Central (new Sonatype Central Portal) under the io.github.astubbs namespace.
Pipeline: a push to master triggers Build and Test; on success, the Publish
workflow deploys via `./mvnw -Pmaven-central clean deploy`. The
central-publishing-maven-plugin (0.10.0) handles both paths:
- SNAPSHOT versions: direct PUT to the snapshot endpoint
(central.sonatype.com/repository/maven-snapshots/)
- RELEASE versions: bundle + validate + auto-publish via Portal API,
plus git tag and GitHub release
The examples module and its children are excluded from deploy (sample code,
not library artifacts): via -pl on the mvn command, and via
maven.deploy.skip, maven.install.skip, gpg.skip, and
central-publishing-plugin skipPublishing=true in the examples pom.
maven-jar-plugin test-jar execution gets skipIfEmpty=true so the parent
pom and examples (no test classes) do not emit empty signed test-jars
that Sonatype rejects. Real test-jars for core, vertx, reactor, and mutiny
publish unchanged.
Prerequisite on the Central Portal: the io.github.astubbs namespace must
have both verification AND the separate "Enable SNAPSHOT Publishing"
toggle switched on. Without the snapshot toggle the snapshot endpoint
returns 403 even with valid credentials.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rebrand audit -- pom, README template, drop upstream-only content: - pom: developer id uses the astubbs GitHub handle; root-pom copyright header bumped to 2026 and now says "Confluent, Inc. and contributors" to match the source-file copyright policy. - README template: fork notice promoted to the top as an IMPORTANT callout, stale status claims and upstream-only marketing removed (Confluent Cloud walkthrough, CSID maintenance line, commercial-support note), copyright line clarified, CI badge uncommented and retargeted. - README template variables: :base_url: now points at the fork; :base_confluent_url: added for historical upstream references. - README template: drop the Performance Tests section and the performance build-script bullet. The CHANGELOG include stays. - RELEASE.adoc: deleted. Documented the upstream's Semaphore-based release flow; the fork's release path is the GitHub Actions + central-publishing-maven-plugin pipeline in .github/workflows/publish.yml. Landscape context for library consumers: - Add a "When to use this library (vs KIP-932 Share Groups)" section near the top so visitors can tell immediately whether Parallel Consumer or Share Groups fits their use case. Share Groups are now GA on Confluent Cloud and Confluent Platform 8.2 / Apache Kafka 4.2 and cover most "parallel consumers independent of partition count" use cases at the broker level. PC still wins when per-key ordering with concurrency beyond partition count matters -- the motivation the broker does not address. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Test infrastructure for reproducing the multi-consumer silent stall after rebalance investigated in issue confluentinc#857. - MultiInstanceRebalanceTest: re-enabled the largeNumberOfInstances test, added a cooperative-assignor variant, tagged as performance, added a gentleChaosRebalance variant that reliably reproduces the stall - ManagedPCInstance: extracted into src/test-integration/java/.../utils so multiple tests can reuse the lifecycle harness (fresh Kafka container per test, thread-confinement checks, deterministic teardown) - ManagedPCInstanceLifecycleTest: dedicated lifecycle/teardown coverage for the new ManagedPCInstance harness - ShardManagerStaleContainerTest: reproducing the stale-container issue at the same offset under rebalance - ArchitectureTest: ArchUnit rule enforcing no raw Consumer field on any class that should use ThreadConfinedConsumer - ModelUtils: test helpers updated for the new harness - BrokerIntegrationTest: settle-time + consumer-close hooks used by the chaos-monkey tests - src/test-integration/resources/logback-test.xml: per-test-logging config for the diagnostic sessions Squashed from 35 commits of chronological investigation. Original per-commit history preserved at backup/pre-rebase-bugs-857. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ance Bundles the code changes attempted during the silent-stall investigation. Multiple provisional fixes landed; none eliminate the stall at 100% -- 10%-20% of aggressive-chaos-monkey runs still stall. See docs/BUG_857_INVESTIGATION.md for full narrative. Root cause is still open (current working hypothesis: Kafka eager-rebalance protocol interaction). Code changes included here: - ThreadConfinedConsumer (new): wrapper enforcing single-thread access to the underlying Consumer, surfacing thread-safety violations that were previously silent - ConsumerManager: accepts ThreadConfinedConsumer; CME fix in poll(); revert of explicit consumer close - BrokerPollSystem: duplicate-run() guard, started-flag relocation to start() to prevent double-submission - AbstractParallelEoSStreamProcessor: tryLock(commitCommand) in onPartitionsRevoked to avoid deadlock under rebalance; trace logging around partition-revoke / assignment cache update - EpochAndRecordsMap: always-update-assignment-cache + trace logging - PartitionStateManager / PartitionState: reset pausedForThrottling on partition assignment; counter adjustments on partition revoke - WorkManager / ShardManager: numberRecordsOutForProcessing adjustments, checkGroupId null check, init-cache fix, revert resetKafkaContainer - PCModule: wire ThreadConfinedConsumer through the DI Squashed from 35 commits of chronological investigation. Original per-commit history preserved at backup/pre-rebase-bugs-857. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…use still open Investigation artefacts for the multi-consumer silent stall after rebalance tracked in issue confluentinc#857. - docs/BUG_857_INVESTIGATION.md: consolidated investigation narrative across multiple hypotheses -- numberRecordsOutForProcessing counter drift, pausedForThrottling leak across rebalances, CME in poll(), commitCommand lock contention, ConsumerManager thread-safety, Kafka eager rebalance protocol interaction. Each hypothesis documents what was tested, what was fixed, and why the stall still reproduces at the residual ~10-20% rate. Current working hypothesis at the top of the doc. - AGENTS.md: note the investigation branch + reuse of ManagedPCInstance test harness. - pom.xml: test-scope dependency bumps needed by the expanded test suite (ArchUnit, mockito, etc). Squashed from 35 commits of chronological investigation. Original per-commit history preserved at backup/pre-rebase-bugs-857. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
a6ce9bf to
262629a
Compare
|
|
@astubbs I see that it's still work in progress? Sorry for troubling you so many times but this issue started to re-appear more frequently. |
Summary
Fixes the silent consumption stall reported in confluentinc/parallel-consumer#857 — after Kafka rebalances, consumption stops on certain partitions with no errors.
Result: aggressive chaos test passes 9/10 (90%), up from ~20% at the start.
Root causes found and fixed
Production code (7 fixes)
commitCommand deadlock (the Paused consumption across multiple consumers confluentinc/parallel-consumer#857 root cause) —
onPartitionsRevokedblocked onsynchronized(commitCommand)while the control thread held the lock mid-commit. Replaced withReentrantLock.tryLock()so the rebalance callback never blocks.ConcurrentModificationException prevention —
updateCache()(callsconsumer.groupMetadata()) moved afterpollingBroker=falseto prevent race withconsumer.wakeup().Counter drift —
numberRecordsOutForProcessingnot adjusted when partitions revoked. AddedadjustOutForProcessingOnRevoke()before shard cleanup.Throttle flag —
pausedForThrottlingnot reset on partition assignment. AddedBrokerPollSystem.onPartitionsAssigned().ThreadConfinedConsumer — new Lombok
@Delegatewrapper enforcing single-thread consumer access at runtime. Zero violations detected across all test runs, proving PC's thread discipline is correct.Raw consumer field removed —
AbstractParallelEoSStreamProcessorno longer holds a rawConsumer<K,V>. All access routed throughConsumerManagerviaPCModuleDI.ArchUnit rules — compile-time enforcement that only
ConsumerManager,ThreadConfinedConsumer, andParallelConsumerOptionsmay holdConsumer<K,V>fields. Also enforcesProducer<K,V>field isolation.Test infrastructure (7 improvements)
stopAsync()— chaos monkey no longer freezes for 30-40s duringclose()startedflag — moved tostart()to prevent double-submission ofrun()closePendingguard — prevents toggle during background closeTest results
Acceptance threshold for aggressive test: 80%. The remaining ~10% failure is the Kafka consumer group protocol under extreme membership churn (
assignedPartitions=0), not a PC bug.Files changed (22 files, +1702 / -186)
Production code:
AbstractParallelEoSStreamProcessor.java— deadlock fix, raw consumer removed, DI wiringBrokerPollSystem.java— throttle reset, trace loggingConsumerManager.java— subscribe methods, assignment cache, initPCModule.java— ThreadConfinedConsumer wiringThreadConfinedConsumer.java— new runtime thread-safety wrapperWorkManager.java— counter adjustment on revokeShardManager.java—countInflightForPartitions()helperPartitionState.java— epoch mismatch logging upgradePartitionStateManager.java— epoch trace loggingEpochAndRecordsMap.java— poll-time epoch trace loggingTest code:
MultiInstanceRebalanceTest.java— refactored to use ManagedPCInstance, chaos variantsManagedPCInstance.java— new shared test utilityManagedPCInstanceLifecycleTest.java— new focused lifecycle testShardManagerStaleContainerTest.java— new deterministic unit testsArchitectureTest.java— new ArchUnit rulesModelUtils.java— epoch-aware work container factoryDocumentation:
docs/BUG_857_INVESTIGATION.md— new full investigation reportAGENTS.md— development rules addedpom.xml— ArchUnit dependencyInvestigation history
The full investigation is documented in 36 commits on this branch. See
docs/BUG_857_INVESTIGATION.mdfor the narrative and the branch commit history for the step-by-step journey from 20% to 90%.🤖 Generated with Claude Code