fix: replace stale container at same offset after rebalance (#909)#31
fix: replace stale container at same offset after rebalance (#909)#31astubbs wants to merge 11 commits intomaster-confluentfrom
Conversation
cserspring
left a comment
There was a problem hiding this comment.
Thanks, I reviewed it and it looks good. Feel free to merge.
6975295 to
18d1651
Compare
Dependency ReviewThe following issues were found:
License Issues.github/workflows/performance.yml
.github/workflows/publish.yml
parallel-consumer-examples/parallel-consumer-example-core/pom.xml
parallel-consumer-examples/parallel-consumer-example-metrics/pom.xml
parallel-consumer-examples/parallel-consumer-example-reactor/pom.xml
parallel-consumer-examples/parallel-consumer-example-streams/pom.xml
parallel-consumer-examples/parallel-consumer-example-vertx/pom.xml
parallel-consumer-examples/pom.xml
parallel-consumer-mutiny/pom.xml
parallel-consumer-reactor/pom.xml
parallel-consumer-vertx/pom.xml
pom.xml
OpenSSF ScorecardScorecard details
Scanned Files
|
✅ Duplicate Code ReportTwo engines run in parallel for cross-validation. Each has its own thresholds tuned to its baseline - the real safety net is the per-engine "max increase vs base" check. ✅ PMD CPD
|
| PR | Base | Change | |
|---|---|---|---|
| Clones | 77 | 78 | 🙂 -1 |
| Duplicated lines | 1105 | 1127 | ❤️ -22 |
| Duplication | 3.84% | 3.85% | 🙂 -0.01% |
| Rule | Limit | Status |
|---|---|---|
| Max duplication | 4% | ✅ Pass (3.84%) |
| Max increase vs base | +0.1% | ✅ Pass (-0.01%) |
⚠️ 8 new clones introduced
- 17 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/EpochAndRecordsMapRaceTest.java:37<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ProcessingShardStaleReplacementTest909.java:33 - 15 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java:1<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java:1 - 14 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java:64<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java:80 - 15 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithCommitTimeoutException.java:121<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java:121 - 21 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTest.java:51<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java:97 - 12 lines:
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTest.java:72<->parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithSaslAuthenticationException.java:121 - 16 lines:
parallel-consumer-reactor/pom.xml:16<->parallel-consumer-vertx/pom.xml:21 - 18 lines:
parallel-consumer-mutiny/pom.xml:20<->parallel-consumer-vertx/pom.xml:20
Powered by astubbs/duplicate-code-cross-check
| private void addRecords(MockConsumer<String, String> mockConsumer) { | ||
| for(int i = 0; i < 10; i++) { | ||
| mockConsumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord<>(topic, 0, i, "key", "value")); | ||
| for (int i = 0; i < 10; i++) { |
There was a problem hiding this comment.
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java:121
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master-confluent #31 +/- ##
===================================================
Coverage ? 77.02%
Complexity ? 1153
===================================================
Files ? 82
Lines ? 4195
Branches ? 387
===================================================
Hits ? 3231
Misses ? 772
Partials ? 192
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
❌ Mutation Testing (PIT) ReportPIT did not produce a report. Most commonly this means a test failed in the baseline (PIT runs all tests unmodified first to establish green) and PIT aborted before mutating. See the "Run PIT mutation testing" step logs for the failing test, then either fix it or add it to |
Republish under new Maven coordinates so the fork can be released to Maven Central independently of upstream confluentinc/parallel-consumer (which is no longer actively maintained). Changes: - groupId: io.confluent.parallelconsumer -> io.github.astubbs.parallelconsumer - version: 0.5.3.4-SNAPSHOT -> 0.6.0.0-SNAPSHOT - organization: Confluent, Inc. -> Antony Stubbs - SCM URLs: confluentinc -> astubbs - developer: antony.stubbs@gmail.com - Display names: drop "Confluent" prefix - License header template: now "Confluent, Inc. and contributors" (preserves original Apache 2.0 attribution while crediting fork contributors going forward) - Add NOTICE file per Apache 2.0 §4(d) crediting Confluent as the original author - README updated with new coordinates and a note explaining the fork Java packages (io.confluent.parallelconsumer.*) and artifactIds are intentionally unchanged so cherry-picking PRs back upstream stays clean if they ever resume maintenance. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…cture Replace Confluent's Semaphore CI with GitHub Actions. Includes: - Parallel PR builds (unit/integration/performance) with fail-fast - Quality gates: PMD CPD duplicate detection, file similarity check, SpotBugs, dependency vulnerability scanning, PIT mutation testing - Maven cache with rotating save key to prevent stale-cache trap - Maven HTTP timeouts (.mvn/maven.config) for connection reliability - Codecov coverage tracking with drop prevention - Claude Code Review and PR dependency check workflows - Performance test infrastructure with self-hosted runner support - Build scripts (bin/ci-build.sh, ci-unit-test.sh, etc.) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…IT stall - EpochAndRecordsMap: skip records for partitions with no epoch assigned yet (race between poll and onPartitionsAssigned) - VertxConcurrencyIT: increase wait timeout and add CountDownLatch protection to prevent CI stalls Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ditions Root-caused PIT's ~85s baseline cliff: MockConsumerTestWithEarlyClose and CommitTimeoutException leaked non-daemon threads that threw IllegalStateException on closed MockConsumer during the next test. Fixes: - MockConsumerTestWithEarlyClose/CommitTimeoutException: daemon threads, interrupt in finally, catch IllegalStateException in addRecords loop - MockConsumerTestWithSaslAuthenticationException: scope Awaitility timeout locally, add @AfterEach close, shrink mock-failure window - ParallelEoSSStreamProcessorRebalancedTest: remove empty close() override hiding base-class cleanup - ProducerManagerTest: @AfterEach Awaitility.reset(), explicit timeouts - PCMetricsTest: explicit atMost(120s) on bare await() calls - PartitionOrderProcessingTest: merge await + assertion to fix race - Base class: Awaitility.reset() in @AfterEach as general guard - New EpochAndRecordsMapRaceTest for null-epoch guard Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…docs - Rename from "Confluent Parallel Consumer" to "Kafka Parallel Consumer" - Point issue tracker to astubbs/parallel-consumer, remove Confluent Slack link - Add upstream PR analysis report - Add CHANGELOG 0.6.0.0 section - Fix AGENTS.md CI description, add build scripts and copyright rules - Add Documented Solutions section to AGENTS.md - Add solution docs: copyright header rules, CI security hardening - Remove duplicate Releasing section in README_TEMPLATE.adoc - Add multi-partition partial-skip test to EpochAndRecordsMapRaceTest - Fix deriveCpKafkaImage Javadoc comment Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Switch check-dependencies.yml from pull_request_target to pull_request to prevent fork PRs running with base repo write permissions - Pin all three custom actions to commit SHAs instead of mutable branch refs (dependencies-action@a09974c, duplicate-code-cross-check@d3140ef, duplicate-code-detection-tool@4e302e7) - Gate publish.yml on CI success via workflow_run trigger instead of deploying on every master push regardless of test results - Add try/catch and pre-release suffix stripping to deriveCpKafkaImage() so unexpected version strings fall back gracefully instead of crashing all integration tests with ExceptionInInitializerError - Add Documented Solutions section to AGENTS.md for discoverability - Add copyright header management solution doc Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…epare-deps cache warming PR builds now run two tiers in parallel: - Split suites (unit, integration, performance) on default Kafka 3.9.1 - Experimental Kafka 4.x compatibility check (non-blocking) Push-to-master runs a single ci-build.sh (default Kafka version) to gate SNAPSHOT publishing. No more version matrix on master. All jobs now use explicit cache/restore with rotating keys from prepare-deps. Eliminated all setup-java cache: 'maven' usage which caused the frozen-cache bug (immutable keys could never overwrite an incomplete cache entry). prepare-deps now runs on push builds too, so master hits the reactor with a warm Maven Central cache for the vertx dependencies that previously caused consistent 120s timeouts. Also skip unit tests in performance-test.sh so a flaky unit test can't cascade-cancel the other PR matrix jobs via fail-fast. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…plugin
Sets up the fork to publish both snapshot and release artifacts to Maven
Central (new Sonatype Central Portal) under the io.github.astubbs namespace.
Pipeline: a push to master triggers Build and Test; on success, the Publish
workflow deploys via `./mvnw -Pmaven-central clean deploy`. The
central-publishing-maven-plugin (0.10.0) handles both paths:
- SNAPSHOT versions: direct PUT to the snapshot endpoint
(central.sonatype.com/repository/maven-snapshots/)
- RELEASE versions: bundle + validate + auto-publish via Portal API,
plus git tag and GitHub release
The examples module and its children are excluded from deploy (sample code,
not library artifacts): via -pl on the mvn command, and via
maven.deploy.skip, maven.install.skip, gpg.skip, and
central-publishing-plugin skipPublishing=true in the examples pom.
maven-jar-plugin test-jar execution gets skipIfEmpty=true so the parent
pom and examples (no test classes) do not emit empty signed test-jars
that Sonatype rejects. Real test-jars for core, vertx, reactor, and mutiny
publish unchanged.
Prerequisite on the Central Portal: the io.github.astubbs namespace must
have both verification AND the separate "Enable SNAPSHOT Publishing"
toggle switched on. Without the snapshot toggle the snapshot endpoint
returns 403 even with valid credentials.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rebrand audit -- pom, README template, drop upstream-only content: - pom: developer id uses the astubbs GitHub handle; root-pom copyright header bumped to 2026 and now says "Confluent, Inc. and contributors" to match the source-file copyright policy. - README template: fork notice promoted to the top as an IMPORTANT callout, stale status claims and upstream-only marketing removed (Confluent Cloud walkthrough, CSID maintenance line, commercial-support note), copyright line clarified, CI badge uncommented and retargeted. - README template variables: :base_url: now points at the fork; :base_confluent_url: added for historical upstream references. - README template: drop the Performance Tests section and the performance build-script bullet. The CHANGELOG include stays. - RELEASE.adoc: deleted. Documented the upstream's Semaphore-based release flow; the fork's release path is the GitHub Actions + central-publishing-maven-plugin pipeline in .github/workflows/publish.yml. Landscape context for library consumers: - Add a "When to use this library (vs KIP-932 Share Groups)" section near the top so visitors can tell immediately whether Parallel Consumer or Share Groups fits their use case. Share Groups are now GA on Confluent Cloud and Confluent Platform 8.2 / Apache Kafka 4.2 and cover most "parallel consumers independent of partition count" use cases at the broker level. PC still wins when per-key ordering with concurrency beyond partition count matters -- the motivation the broker does not address. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…set (confluentinc#909) Reproduces the exact race condition from PR confluentinc#909: after rebalance, a stale WorkContainer at the same offset blocks fresh work from being added. Verifies that stale entries are replaced and non-stale duplicates are still dropped. See confluentinc#909 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
18d1651 to
587b817
Compare
✅ SpotBugs ReportNo bugs found (new bugs only — baseline from base branch excluded). |
Summary
Cherry-pick of @cserspring's fix from upstream confluentinc/parallel-consumer#909, with an added regression test.
The bug: After a rebalance,
removeStaleContainers()can't clean shards that don't exist yet. When the control thread resumes processing an old batch, it adds a staleWorkContainerto a newly-created shard. The next poll's fresh container at the same offset is then dropped becauseaddWorkContainersees "entry already exists." The record is permanently lost until pod restart.The fix: In
ProcessingShard.addWorkContainer(), check if the existing entry is stale (wrong epoch) and replace it instead of dropping the new one.Test plan
ProcessingShardStaleReplacementTest909passes (2 tests)cc @cserspring — your fix from the upstream PR has been adopted in this fork. Thank you for the detailed race condition analysis!
🤖 Generated with Claude Code