fix: make committed offset accurate on partition assignment (cherry-pick Confluent #893)#42
fix: make committed offset accurate on partition assignment (cherry-pick Confluent #893)#42astubbs wants to merge 10 commits intodev/ci-tweakfrom
Conversation
3eb1a9a to
94c35dd
Compare
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
✅ Duplicate Code ReportTwo engines run in parallel for cross-validation. Each has its own thresholds tuned to its baseline - the real safety net is the per-engine "max increase vs base" check. ✅ PMD CPD
No new clones introduced by this PR. ✅ jscpd (language-agnostic)
No new clones introduced by this PR. Powered by astubbs/duplicate-code-cross-check |
| // across tests under PIT's different ordering, since this class doesn't have base-class | ||
| // Awaitility.reset() cleanup). | ||
| // 45s: 8s mock-failure window + retry + PIT's JVM slowdown, with headroom. | ||
| Awaitility.await().atMost(Duration.ofSeconds(45)).untilAsserted(() -> { |
There was a problem hiding this comment.
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTest.java:72
| .saslAuthenticationRetryTimeout(Duration.ofSeconds(30L)) | ||
| .build(); | ||
| var parallelConsumer = new ParallelEoSStreamProcessor<String, String>(options); | ||
| parallelConsumer = new ParallelEoSStreamProcessor<>(options); |
There was a problem hiding this comment.
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTest.java:51
| private void addRecords(MockConsumer<String, String> mockConsumer) { | ||
| for(int i = 0; i < 10; i++) { | ||
| mockConsumer.addRecord(new org.apache.kafka.clients.consumer.ConsumerRecord<>(topic, 0, i, "key", "value")); | ||
| for (int i = 0; i < 10; i++) { |
There was a problem hiding this comment.
parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/MockConsumerTestWithEarlyClose.java:121
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## dev/ci-tweak #42 +/- ##
===============================================
Coverage ? 77.14%
Complexity ? 1154
===============================================
Files ? 82
Lines ? 4191
Branches ? 387
===============================================
Hits ? 3233
Misses ? 767
Partials ? 191
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
❌ Mutation Testing (PIT) ReportPIT did not produce a report. Most commonly this means a test failed in the baseline (PIT runs all tests unmodified first to establish green) and PIT aborted before mutating. See the "Run PIT mutation testing" step logs for the failing test, then either fix it or add it to |
fab5b0b to
a35c7db
Compare
Republish under new Maven coordinates so the fork can be released to Maven Central independently of upstream confluentinc/parallel-consumer (which is no longer actively maintained). Changes: - groupId: io.confluent.parallelconsumer -> io.github.astubbs.parallelconsumer - version: 0.5.3.4-SNAPSHOT -> 0.6.0.0-SNAPSHOT - organization: Confluent, Inc. -> Antony Stubbs - SCM URLs: confluentinc -> astubbs - developer: antony.stubbs@gmail.com - Display names: drop "Confluent" prefix - License header template: now "Confluent, Inc. and contributors" (preserves original Apache 2.0 attribution while crediting fork contributors going forward) - Add NOTICE file per Apache 2.0 §4(d) crediting Confluent as the original author - README updated with new coordinates and a note explaining the fork Java packages (io.confluent.parallelconsumer.*) and artifactIds are intentionally unchanged so cherry-picking PRs back upstream stays clean if they ever resume maintenance. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…cture Replace Confluent's Semaphore CI with GitHub Actions. Includes: - Parallel PR builds (unit/integration/performance) with fail-fast - Quality gates: PMD CPD duplicate detection, file similarity check, SpotBugs, dependency vulnerability scanning, PIT mutation testing - Maven cache with rotating save key to prevent stale-cache trap - Maven HTTP timeouts (.mvn/maven.config) for connection reliability - Codecov coverage tracking with drop prevention - Claude Code Review and PR dependency check workflows - Performance test infrastructure with self-hosted runner support - Build scripts (bin/ci-build.sh, ci-unit-test.sh, etc.) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…IT stall - EpochAndRecordsMap: skip records for partitions with no epoch assigned yet (race between poll and onPartitionsAssigned) - VertxConcurrencyIT: increase wait timeout and add CountDownLatch protection to prevent CI stalls Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ditions Root-caused PIT's ~85s baseline cliff: MockConsumerTestWithEarlyClose and CommitTimeoutException leaked non-daemon threads that threw IllegalStateException on closed MockConsumer during the next test. Fixes: - MockConsumerTestWithEarlyClose/CommitTimeoutException: daemon threads, interrupt in finally, catch IllegalStateException in addRecords loop - MockConsumerTestWithSaslAuthenticationException: scope Awaitility timeout locally, add @AfterEach close, shrink mock-failure window - ParallelEoSSStreamProcessorRebalancedTest: remove empty close() override hiding base-class cleanup - ProducerManagerTest: @AfterEach Awaitility.reset(), explicit timeouts - PCMetricsTest: explicit atMost(120s) on bare await() calls - PartitionOrderProcessingTest: merge await + assertion to fix race - Base class: Awaitility.reset() in @AfterEach as general guard - New EpochAndRecordsMapRaceTest for null-epoch guard Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…docs - Rename from "Confluent Parallel Consumer" to "Kafka Parallel Consumer" - Point issue tracker to astubbs/parallel-consumer, remove Confluent Slack link - Add upstream PR analysis report - Add CHANGELOG 0.6.0.0 section - Fix AGENTS.md CI description, add build scripts and copyright rules - Add Documented Solutions section to AGENTS.md - Add solution docs: copyright header rules, CI security hardening - Remove duplicate Releasing section in README_TEMPLATE.adoc - Add multi-partition partial-skip test to EpochAndRecordsMapRaceTest - Fix deriveCpKafkaImage Javadoc comment Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Switch check-dependencies.yml from pull_request_target to pull_request to prevent fork PRs running with base repo write permissions - Pin all three custom actions to commit SHAs instead of mutable branch refs (dependencies-action@a09974c, duplicate-code-cross-check@d3140ef, duplicate-code-detection-tool@4e302e7) - Gate publish.yml on CI success via workflow_run trigger instead of deploying on every master push regardless of test results - Add try/catch and pre-release suffix stripping to deriveCpKafkaImage() so unexpected version strings fall back gracefully instead of crashing all integration tests with ExceptionInInitializerError - Add Documented Solutions section to AGENTS.md for discoverability - Add copyright header management solution doc Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…epare-deps cache warming PR builds now run two tiers in parallel: - Split suites (unit, integration, performance) on default Kafka 3.9.1 - Experimental Kafka 4.x compatibility check (non-blocking) Push-to-master runs a single ci-build.sh (default Kafka version) to gate SNAPSHOT publishing. No more version matrix on master. All jobs now use explicit cache/restore with rotating keys from prepare-deps. Eliminated all setup-java cache: 'maven' usage which caused the frozen-cache bug (immutable keys could never overwrite an incomplete cache entry). prepare-deps now runs on push builds too, so master hits the reactor with a warm Maven Central cache for the vertx dependencies that previously caused consistent 120s timeouts. Also skip unit tests in performance-test.sh so a flaky unit test can't cascade-cancel the other PR matrix jobs via fail-fast. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…plugin
Sets up the fork to publish both snapshot and release artifacts to Maven
Central (new Sonatype Central Portal) under the io.github.astubbs namespace.
Pipeline: a push to master triggers Build and Test; on success, the Publish
workflow deploys via `./mvnw -Pmaven-central clean deploy`. The
central-publishing-maven-plugin (0.10.0) handles both paths:
- SNAPSHOT versions: direct PUT to the snapshot endpoint
(central.sonatype.com/repository/maven-snapshots/)
- RELEASE versions: bundle + validate + auto-publish via Portal API,
plus git tag and GitHub release
The examples module and its children are excluded from deploy (sample code,
not library artifacts): via -pl on the mvn command, and via
maven.deploy.skip, maven.install.skip, gpg.skip, and
central-publishing-plugin skipPublishing=true in the examples pom.
maven-jar-plugin test-jar execution gets skipIfEmpty=true so the parent
pom and examples (no test classes) do not emit empty signed test-jars
that Sonatype rejects. Real test-jars for core, vertx, reactor, and mutiny
publish unchanged.
Prerequisite on the Central Portal: the io.github.astubbs namespace must
have both verification AND the separate "Enable SNAPSHOT Publishing"
toggle switched on. Without the snapshot toggle the snapshot endpoint
returns 403 even with valid credentials.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rebrand audit -- pom, README template, drop upstream-only content: - pom: developer id uses the astubbs GitHub handle; root-pom copyright header bumped to 2026 and now says "Confluent, Inc. and contributors" to match the source-file copyright policy. - README template: fork notice promoted to the top as an IMPORTANT callout, stale status claims and upstream-only marketing removed (Confluent Cloud walkthrough, CSID maintenance line, commercial-support note), copyright line clarified, CI badge uncommented and retargeted. - README template variables: :base_url: now points at the fork; :base_confluent_url: added for historical upstream references. - README template: drop the Performance Tests section and the performance build-script bullet. The CHANGELOG include stays. - RELEASE.adoc: deleted. Documented the upstream's Semaphore-based release flow; the fork's release path is the GitHub Actions + central-publishing-maven-plugin pipeline in .github/workflows/publish.yml. Landscape context for library consumers: - Add a "When to use this library (vs KIP-932 Share Groups)" section near the top so visitors can tell immediately whether Parallel Consumer or Share Groups fits their use case. Share Groups are now GA on Confluent Cloud and Confluent Platform 8.2 / Apache Kafka 4.2 and cover most "parallel consumers independent of partition count" use cases at the broker level. PC still wins when per-key ordering with concurrency beyond partition count matters -- the motivation the broker does not address. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tinc#893) Cherry-pick of confluentinc#893 (author: sangreal). Race condition in PartitionState: createOffsetAndMetadata() called tryToEncodeOffsets() and getOffsetToCommit() separately. Between the two calls, incompletes could drain, causing a higher offset to be committed than intended. After rebalance, the consumer fetches a non-existent offset and triggers auto.offset.reset (data loss or replay). Fix: tryToEncodeOffsets() now calls getOffsetToCommit() once at the top and returns a Tuple<Optional<String>, Long> so the offset and payload are computed atomically from the same state snapshot. Upstream PR: confluentinc#893 Approved by Roman Kolesnev, run in production for >1 week. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
a35c7db to
44c72f5
Compare
Summary
Cherry-pick of confluentinc/parallel-consumer#893 by sangreal.
PartitionState.createOffsetAndMetadata():tryToEncodeOffsets()andgetOffsetToCommit()were called separately. Between the two calls,incompletescould drain, causing a higher offset to be committed than intended. After rebalance, the consumer fetches a non-existent offset and triggersauto.offset.reset(data loss or replay).tryToEncodeOffsets()now callsgetOffsetToCommit()once and returns both the encoded payload and offset as aTuple, ensuring they're computed atomically from the same state snapshot.Upstream PR approved by Roman Kolesnev, run in production for >1 week with zero recurrence.
Resolves upstream issues: confluentinc#777 (partition revocation duplicates), partially confluentinc#809/confluentinc#833 (sporadic commit timeouts).
depends on #34
Test plan
PartitionStateTest, offset encoding tests)TransactionAndCommitModeTest,CloseAndOpenOffsetTest)🤖 Generated with Claude Code